In [None]:
# Install required packages
!python -m pip install paddlepaddle-gpu==2.0.0 -i https://mirror.baidu.com/pypi/simple
!python -m pip install paddlepaddle -i https://pypi.tuna.tsinghua.edu.cn/simple
!pip install paddleocr opencv-python matplotlib scikit-image numpy


In [None]:
# Import necessary libraries
import os
import json
import cv2
import numpy as np
from paddleocr import PaddleOCR, draw_ocr

from matplotlib import pyplot as plt
from skimage import exposure, filters

# Preprocessing functions as defined above
def preprocess_historical_map(image):
    """Basic preprocessing for historical maps"""
    if len(image.shape) == 3:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray = image.copy()
    
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    equalized = clahe.apply(gray)
    
    denoised = cv2.bilateralFilter(equalized, 9, 75, 75)
    
    kernel = np.array([[-1, -1, -1],
                       [-1,  9, -1],
                       [-1, -1, -1]])
    sharpened = cv2.filter2D(denoised, -1, kernel)
    
    binary = cv2.adaptiveThreshold(
        sharpened, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
    )
    
    kernel = np.ones((1, 1), np.uint8)
    opening = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
    
    kernel = np.ones((1, 1), np.uint8)
    dilated = cv2.dilate(opening, kernel, iterations=1)
    
    return dilated


def advanced_map_preprocessing(image):
    """Advanced preprocessing techniques for historical maps"""
    if len(image.shape) == 3:
        rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        rgb = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
        gray = image.copy()
    
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced = clahe.apply(gray)
    
    filtered = cv2.bilateralFilter(enhanced, 9, 75, 75)
    
    edges = cv2.Canny(filtered, 50, 150)
    
    kernel = np.ones((2, 2), np.uint8)
    dilated_edges = cv2.dilate(edges, kernel, iterations=1)
    
    mask = dilated_edges.copy()
    
    masked = cv2.bitwise_and(filtered, filtered, mask=mask)
    
    binary = cv2.adaptiveThreshold(
        filtered, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 7
    )
    
    kernel = np.ones((1, 1), np.uint8)
    cleaned = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
    
    inverted = cv2.bitwise_not(cleaned)
    
    return inverted


def adaptive_map_preprocessing(image):
    """Dynamically adjust preprocessing based on image characteristics"""
    if len(image.shape) == 3:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray = image.copy()
    
    mean_val = np.mean(gray)
    std_val = np.std(gray)
    
    if std_val < 50:  # Low contrast image
        clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
    else:
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced = clahe.apply(gray)
    
    if std_val > 60:  # High variation indicates possible noise
        denoised = cv2.medianBlur(enhanced, 3)
    else:
        denoised = cv2.GaussianBlur(enhanced, (3, 3), 0)
    
    gaussian = cv2.GaussianBlur(denoised, (0, 0), 3.0)
    unsharp_image = cv2.addWeighted(denoised, 1.5, gaussian, -0.5, 0)
    
    if mean_val < 100:  # Darker image
        binary = cv2.adaptiveThreshold(
            unsharp_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
            cv2.THRESH_BINARY, 13, 4
        )
    else:  # Brighter image
        binary = cv2.adaptiveThreshold(
            unsharp_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, 
            cv2.THRESH_BINARY, 11, 2
        )
    
    kernel = np.ones((1, 1), np.uint8)
    cleaned = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
    
    return cleaned


def text_enhanced_preprocessing(image):
    """Preprocessing focused on enhancing text in maps with complex backgrounds"""
    if len(image.shape) == 3:
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    else:
        gray = image.copy()
    
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    contrast_enhanced = clahe.apply(gray)
    
    median_blurred = cv2.medianBlur(contrast_enhanced, 3)
    
    laplacian = cv2.Laplacian(median_blurred, cv2.CV_64F)
    
    laplacian = np.uint8(np.absolute(laplacian))
    
    _, edges = cv2.threshold(laplacian, 15, 255, cv2.THRESH_BINARY)
    
    kernel = np.ones((2, 2), np.uint8)
    dilated_edges = cv2.dilate(edges, kernel, iterations=1)
    
    masked = cv2.bitwise_and(contrast_enhanced, contrast_enhanced, mask=dilated_edges)
    
    thresh = cv2.adaptiveThreshold(masked, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                  cv2.THRESH_BINARY, 11, 2)
    
    return thresh


def setup_optimized_ocr_model():
    """Setup and return an OCR model with optimized parameters for historical maps"""
    ocr_model = PaddleOCR(
        # Use English language model
        lang='en',
        
        # Enable angle classification to detect rotated text
        use_angle_cls=True,
        
        # Set a lower detection threshold to catch more potential text
        det_db_thresh=0.1,
        
        # Set a lower text recognition threshold
        rec_thresh=0.45,
        
        # Optimize text detector parameters for faint text
        det_db_box_thresh=0.3,
        
        # Use a larger unclip ratio to better separate adjacent text
        det_db_unclip_ratio=1.8,
        
        # Enable using GPU if available
        use_gpu=True,
        
        # Set the maximum image size for text detection
        det_limit_side_len=2000,
        
        # Enable text direction classification
        cls_thresh=0.9
    )
    return ocr_model


def process_image_with_multiple_methods(img_path, ocr_model):
    """
    Process an image with multiple preprocessing methods and compare OCR results.
    """
    original_img = cv2.imread(img_path)
    if original_img is None:
        print(f"Error: Could not read image at {img_path}")
        return None, None, {}
    
    output_dir = os.path.dirname(img_path) + "/preprocessed"
    os.makedirs(output_dir, exist_ok=True)
    
    img_rgb = cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB)
    
    # Define preprocessing methods
    methods = {
        "original": img_rgb,
        "basic_preprocessing": cv2.cvtColor(preprocess_historical_map(original_img), cv2.COLOR_GRAY2RGB),
        "advanced_preprocessing": cv2.cvtColor(advanced_map_preprocessing(original_img), cv2.COLOR_GRAY2RGB),
        "adaptive_preprocessing": cv2.cvtColor(adaptive_map_preprocessing(original_img), cv2.COLOR_GRAY2RGB),
        "text_enhanced": cv2.cvtColor(text_enhanced_preprocessing(original_img), cv2.COLOR_GRAY2RGB)
    }
    
    all_results = {}
    best_method = "original"
    best_count = 0
    best_confidence = 0
    best_result = None
    
    for method_name, processed_img in methods.items():
        print(f"Processing with {method_name}...")
        
        base_name = os.path.basename(img_path).split('.')[0]
        processed_path = f"{output_dir}/{base_name}_{method_name}.jpg"
        cv2.imwrite(processed_path, 
                    cv2.cvtColor(processed_img, cv2.COLOR_RGB2BGR))
        
        result = ocr_model.ocr(processed_img, cls=True)
        
        all_results[method_name] = result
        
        if result and result[0]:
            text_count = len(result[0])
            
            if text_count > 0:
                avg_confidence = sum(res[1][1] for res in result[0]) / text_count
            else:
                avg_confidence = 0
                
            print(f"  - Detected {text_count} text regions with avg confidence {avg_confidence:.4f}")
            
            # Choose the best method based on text count and confidence
            if text_count > best_count or (text_count == best_count and avg_confidence > best_confidence):
                best_count = text_count
                best_confidence = avg_confidence
                best_method = method_name
                best_result = result
        else:
            print(f"  - No text detected with {method_name}")
    
    print(f"\nBest method: {best_method} with {best_count} texts detected")
    return best_result, best_method, all_results

def process_folder(folder_path, output_json_path="map_ocr_results.json"):
    """Process all images in a folder, apply preprocessing, and save OCR results."""
    ocr_model = setup_optimized_ocr_model()

    try:
        script_dir = os.path.dirname(os.path.abspath(__file__))
    except NameError:
        script_dir = os.getcwd()
    
    font_path = None
    possible_font_paths = [
        os.path.join(os.path.join(script_dir, 'PaddleOCR', 'doc', 'fonts', 'latin.ttf')),
        # Check in current working directory
        os.path.join(os.getcwd(), 'latin.ttf'),
        # System fonts (Linux/Windows/Mac)
        '/usr/share/fonts/truetype/freefont/FreeMono.ttf',
        'C:/Windows/Fonts/arial.ttf',
        '/System/Library/Fonts/Supplemental/Arial.ttf'
    ]
    
    for path in possible_font_paths:
        if os.path.exists(path):
            font_path = path
            break
    
    if font_path is None:
        print("Warning: Using default system font - annotations may not display properly")
        try:
            from PIL import ImageFont
            font_path = ImageFont.load_default()
        except Exception as e:
            print(f"Font loading error: {e}")
            font_path = None

    image_files = [f for f in os.listdir(folder_path) 
                   if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]
    
    output_dir = os.path.join(folder_path, "processed_results")
    os.makedirs(output_dir, exist_ok=True)
    
    ocr_results = []
    
    for image_file in image_files:
        img_path = os.path.join(folder_path, image_file)
        print(f"\nProcessing: {img_path}")
        
        best_result, best_method, method_results = process_image_with_multiple_methods(
            img_path, ocr_model
        )
        
        if best_result is None:
            print(f"Failed to process {image_file}")
            continue
        
        if best_result and best_result[0]:
            output_dir_preproc = os.path.join(folder_path, "preprocessed")
            base_name = os.path.basename(img_path).split('.')[0]
            best_img_path = f"{output_dir_preproc}/{base_name}_{best_method}.jpg"
            best_img = cv2.imread(best_img_path)
            best_img_rgb = cv2.cvtColor(best_img, cv2.COLOR_BGR2RGB)
            
            boxes = [line[0] for line in best_result[0]]
            texts = [line[1][0] for line in best_result[0]]
            scores = [line[1][1] for line in best_result[0]]
            
            try:
                annotated = draw_ocr(best_img_rgb, boxes, texts, scores, font_path=font_path)
            except Exception as e:
                print(f"Annotation error: {e}")
                annotated = best_img_rgb
            
            output_path = os.path.join(output_dir, f"{base_name}_annotated.jpg")
            plt.figure(figsize=(12, 8))
            plt.title(f"OCR Results for {image_file}")
            plt.imshow(annotated)
            plt.axis("off")
            plt.savefig(output_path, bbox_inches='tight')
            plt.close()
            
            print(f"Saved annotated image to {output_path}")
            
            groups = []
            for line in best_result[0]:
                box, (text, score) = line
                vertices = [[float(x), float(y)] for x, y in box]
                
                groups.append({
                    "vertices": vertices,
                    "text": text,
                    "illegible": False,
                    "truncated": False
                })
            
            ocr_results.append({
                "image": os.path.join('rumsey/val/', image_file).replace("\\", "/"), #Modify path as you want to store into results.json
                "groups": [groups]
            })
    
    # Save to JSON file
    with open(output_json_path, 'w') as f:
        json.dump(ocr_results, f, indent=4)
    
    print(f"\nSaved OCR results to {output_json_path}")
    return ocr_results


def main():
    """Main function to process all images in the specified folder"""
    folder_path = '"path/to/images"' #Enter path to directory of images
    
    # Process all images in the folder
    results = process_folder(folder_path, "path/to/save/the/results.json")
    
    print("\nProcessing complete!")
    print(f"Processed {len(results)} images.")

if __name__ == "__main__":
    main()
