# Railway Image Preprocessing Analysis

This notebook applies 5 different preprocessing techniques to a folder of railway images and analyzes the results.

## Analysis Goals:
- **Batch Processing**: Apply preprocessing to multiple images from a folder
- **Method Comparison**: Compare 5 different preprocessing techniques
- **Visual Analysis**: Display example images and their preprocessing results
- **Result Storage**: Save processed images for further analysis

## Preprocessing Methods:
1. **Histogram Equalization** - Enhances contrast
2. **CLAHE** - Adaptive histogram equalization
3. **Gamma Correction** - Brightens dark regions
4. **Unsharp Masking** - Enhances edge details
5. **Edge Enhancement** - Highlights structural boundaries

In [None]:
# Import Required Libraries
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import cv2
from PIL import Image
import warnings
import random
import time
from collections import defaultdict
warnings.filterwarnings('ignore')

# Import our custom preprocessing class
from img_preprocessing import ImagePreprocessor

# Set matplotlib style
plt.style.use('default')
sns.set_palette("husl")

print(f"OpenCV version: {cv2.__version__}")
print("ImagePreprocessor class imported successfully!")

In [None]:
# Configuration
INPUT_FOLDER = "./datasets/clustering_sample_100"  # Input folder with images
OUTPUT_FOLDER = "./results_img_pp"                  # Output folder for processed images
IMG_SIZE = (224, 224)                               # Standard image size
RANDOM_SEED = 42
EXAMPLE_COUNT = 4                                   # Number of example images to visualize

# Set random seed
np.random.seed(RANDOM_SEED)
random.seed(RANDOM_SEED)

print(f"Input folder: {INPUT_FOLDER}")
print(f"Output folder: {OUTPUT_FOLDER}")
print(f"Target image size: {IMG_SIZE}")
print(f"Example images to show: {EXAMPLE_COUNT}")

In [None]:
# Create output directory structure
def create_output_directories(base_output_path: str):
    """Create directory structure for processed images."""
    methods = ['original', 'hist_eq', 'clahe', 'gamma', 'unsharp', 'edge_enhanced']
    
    directories = {}
    for method in methods:
        method_dir = Path(base_output_path) / method
        method_dir.mkdir(parents=True, exist_ok=True)
        directories[method] = method_dir
    
    return directories

# Create output directories
output_dirs = create_output_directories(OUTPUT_FOLDER)
print("Created output directories:")
for method, directory in output_dirs.items():
    print(f"  {method}: {directory}")

In [None]:
def load_image_data(image_path: str, target_size: tuple = IMG_SIZE):
    """
    Load and prepare image data.
    
    Args:
        image_path: Path to the image file
        target_size: Target size for resizing (width, height)
    
    Returns:
        dict: Dictionary containing image data and statistics
    """
    if not os.path.exists(image_path):
        return None
    
    try:
        # Load original image
        img_pil = Image.open(image_path)
        img_original = np.array(img_pil)
        
        # Resize image
        img_pil_resized = img_pil.resize(target_size, Image.Resampling.LANCZOS)
        img_resized = np.array(img_pil_resized)
        
        # Convert to grayscale for statistics
        if len(img_resized.shape) == 3:
            img_gray = cv2.cvtColor(img_resized, cv2.COLOR_RGB2GRAY)
        else:
            img_gray = img_resized
        
        # Calculate statistics
        stats = {
            'mean_intensity': np.mean(img_gray),
            'std_intensity': np.std(img_gray),
            'min_intensity': np.min(img_gray),
            'max_intensity': np.max(img_gray),
            'brightness_category': 'dark' if np.mean(img_gray) < 85 else 'medium' if np.mean(img_gray) < 170 else 'bright'
        }
        
        return {
            'resized': img_resized,
            'gray': img_gray,
            'stats': stats,
            'filename': Path(image_path).name, 
            'path': image_path
        }
    except Exception as e:
        print(f"Error loading {image_path}: {e}")
        return None

In [None]:
def apply_all_preprocessing_methods(img_data):
    """
    Apply all 5 preprocessing methods to the input image.
    
    Args:
        img_data: Dictionary containing image data
    
    Returns:
        dict: Dictionary containing all preprocessed versions
    """
    if img_data is None:
        return None
        
    img = img_data['resized']
    
    try:
        methods = {
            'original': img,
            'hist_eq': ImagePreprocessor.method_1_histogram_equalization(img),
            'clahe': ImagePreprocessor.method_2_clahe(img),
            'gamma': ImagePreprocessor.method_3_gamma_correction(img, gamma=0.7),
            'unsharp': ImagePreprocessor.method_4_unsharp_masking(img),
            'edge_enhanced': ImagePreprocessor.method_5_edge_enhancement(img)
        }
        return methods
    except Exception as e:
        print(f"Error processing {img_data['filename']}: {e}")
        return None

In [None]:
def save_processed_image(processed_img, output_path: str):
    """Save processed image to output path."""
    try:
        if len(processed_img.shape) == 2:  # Grayscale
            img_pil = Image.fromarray(processed_img, mode='L')
        else:  # RGB
            img_pil = Image.fromarray(processed_img, mode='RGB')
        
        img_pil.save(output_path, 'PNG')
        return True
    except Exception as e:
        print(f"Error saving {output_path}: {e}")
        return False

In [None]:
# Load and analyze input images
print("=" * 80)
print("LOADING INPUT IMAGES")
print("=" * 80)

if not os.path.exists(INPUT_FOLDER):
    print(f"Error: Input folder {INPUT_FOLDER} does not exist!")
    print("Available dataset directories:")
    datasets_dir = Path("./datasets")
    if datasets_dir.exists():
        for subdir in datasets_dir.iterdir():
            if subdir.is_dir():
                img_count = len([f for f in subdir.iterdir() if f.suffix == '.png'])
                print(f"  {subdir.name}: {img_count} images")
else:
    # Get all image files
    image_files = [f for f in Path(INPUT_FOLDER).iterdir() if f.suffix.lower() in ['.png', '.jpg', '.jpeg']]
    print(f"Found {len(image_files)} images in {INPUT_FOLDER}")
    
    # Select random sample for examples
    if len(image_files) > 0:
        example_files = random.sample(image_files, min(EXAMPLE_COUNT, len(image_files)))
        print(f"Selected {len(example_files)} example images for visualization")
        
        # Show selected examples
        print("\nExample images:")
        for i, img_file in enumerate(example_files, 1):
            print(f"  {i}. {img_file.name}")
    else:
        print("No images found in input folder!")
        example_files = []

In [None]:
# Visualize example images with their statistics
def visualize_example_images(image_files):
    """Visualize example images and their basic statistics."""
    if not image_files:
        print("No images to visualize.")
        return
    
    n_images = len(image_files)
    fig, axes = plt.subplots(2, n_images, figsize=(5*n_images, 10))
    if n_images == 1:
        axes = axes.reshape(2, 1)
    
    for i, img_file in enumerate(image_files):
        img_data = load_image_data(str(img_file))
        if img_data is None:
            continue
            
        stats = img_data['stats']
        
        # Show image
        axes[0, i].imshow(img_data['resized'])
        axes[0, i].set_title(f"{img_data['filename']}\nBrightness: {stats['brightness_category']}")
        axes[0, i].axis('off')
        
        # Show histogram
        axes[1, i].hist(img_data['gray'].flatten(), bins=50, alpha=0.7, color='blue')
        axes[1, i].axvline(stats['mean_intensity'], color='red', linestyle='--', 
                          label=f"Mean: {stats['mean_intensity']:.1f}")
        axes[1, i].set_xlabel('Pixel Intensity')
        axes[1, i].set_ylabel('Frequency')
        axes[1, i].set_title(f'Histogram (Mean: {stats["mean_intensity"]:.1f})')
        axes[1, i].legend()
        axes[1, i].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

if example_files:
    print("\n" + "=" * 80)
    print("VISUALIZING EXAMPLE IMAGES")
    print("=" * 80)
    visualize_example_images(example_files)

In [None]:
# Demonstrate preprocessing methods on one example
def visualize_preprocessing_methods(img_data):
    """Visualize all preprocessing methods side by side."""
    if img_data is None:
        return
        
    processed_images = apply_all_preprocessing_methods(img_data)
    if processed_images is None:
        return
    
    method_names = [
        'Original',
        'Histogram Equalization', 
        'CLAHE',
        'Gamma Correction (γ=0.7)',
        'Unsharp Masking',
        'Edge Enhancement'
    ]
    
    fig, axes = plt.subplots(2, 3, figsize=(18, 12))
    axes = axes.flatten()
    
    for i, (key, name) in enumerate(zip(processed_images.keys(), method_names)):
        img = processed_images[key]
        axes[i].imshow(img, cmap='gray' if len(img.shape) == 2 else None)
        axes[i].set_title(f"{name}\nMean: {np.mean(img):.1f}")
        axes[i].axis('off')
    
    plt.tight_layout()
    plt.suptitle(f"Preprocessing Methods Comparison - {img_data['filename']}", y=1.02, fontsize=16)
    plt.show()
    
    return processed_images

if example_files:
    print("\n" + "=" * 80)
    print("DEMONSTRATING PREPROCESSING METHODS")
    print("=" * 80)
    
    # Show preprocessing on the first example
    first_example = load_image_data(str(example_files[0]))
    if first_example:
        demo_processed = visualize_preprocessing_methods(first_example)
        print(f"\nPreprocessing demonstrated on: {first_example['filename']}")

In [None]:
# Process all images in the folder
def process_image_folder(input_folder: str, output_dirs: dict):
    """Process all images in the input folder and save results."""
    image_files = [f for f in Path(input_folder).iterdir() 
                   if f.suffix.lower() in ['.png', '.jpg', '.jpeg']]
    
    if not image_files:
        print("No images found to process.")
        return
    
    print(f"Processing {len(image_files)} images...")
    print("-" * 60)
    
    stats = {
        'total_processed': 0,
        'total_failed': 0,
        'brightness_distribution': defaultdict(int),
        'method_stats': defaultdict(lambda: {'saved': 0, 'failed': 0})
    }
    
    start_time = time.time()
    
    for i, img_file in enumerate(image_files, 1):
        # Load image
        img_data = load_image_data(str(img_file))
        if img_data is None:
            stats['total_failed'] += 1
            continue
        
        # Track brightness distribution
        stats['brightness_distribution'][img_data['stats']['brightness_category']] += 1
        
        # Apply preprocessing methods
        processed_images = apply_all_preprocessing_methods(img_data)
        if processed_images is None:
            stats['total_failed'] += 1
            continue
        
        # Save processed images
        filename = img_data['filename']
        for method, processed_img in processed_images.items():
            output_path = output_dirs[method] / filename
            if save_processed_image(processed_img, str(output_path)):
                stats['method_stats'][method]['saved'] += 1
            else:
                stats['method_stats'][method]['failed'] += 1
        
        stats['total_processed'] += 1
        
        # Progress update
        if i % 100 == 0 or i == len(image_files):
            print(f"Processed {i}/{len(image_files)} images ({(i/len(image_files)*100):.1f}%)")
    
    end_time = time.time()
    duration = end_time - start_time
    
    return stats, duration

# Execute processing
if image_files:
    print("\n" + "=" * 80)
    print("PROCESSING ALL IMAGES")
    print("=" * 80)
    
    processing_stats, processing_duration = process_image_folder(INPUT_FOLDER, output_dirs)
    
    # Display results
    print("\n" + "=" * 80)
    print("PROCESSING SUMMARY")
    print("=" * 80)
    
    print(f"Processing time: {processing_duration:.2f} seconds")
    print(f"Images processed: {processing_stats['total_processed']}")
    print(f"Failed images: {processing_stats['total_failed']}")
    print(f"Processing rate: {processing_stats['total_processed']/processing_duration:.2f} images/second")
    
    print("\nBrightness distribution:")
    for category, count in processing_stats['brightness_distribution'].items():
        percentage = (count / processing_stats['total_processed']) * 100
        print(f"  {category}: {count} images ({percentage:.1f}%)")
    
    print("\nMethod processing statistics:")
    for method, method_stats in processing_stats['method_stats'].items():
        saved = method_stats['saved']
        failed = method_stats['failed']
        total = saved + failed
        success_rate = (saved / total * 100) if total > 0 else 0
        print(f"  {method}: {saved} saved, {failed} failed ({success_rate:.1f}% success)")
    
    print(f"\nResults saved to: {OUTPUT_FOLDER}/")
    for method, directory in output_dirs.items():
        file_count = len([f for f in directory.iterdir() if f.is_file()])
        print(f"  {method}: {file_count} images")
else:
    print("No images to process.")

In [None]:
# Create a comparison visualization for different brightness categories
def analyze_brightness_categories(input_folder: str, max_per_category: int = 2):
    """Analyze and visualize images from different brightness categories."""
    image_files = [f for f in Path(input_folder).iterdir() 
                   if f.suffix.lower() in ['.png', '.jpg', '.jpeg']]
    
    if not image_files:
        return
    
    # Categorize images by brightness
    brightness_categories = {'dark': [], 'medium': [], 'bright': []}
    
    for img_file in image_files:
        img_data = load_image_data(str(img_file))
        if img_data:
            category = img_data['stats']['brightness_category']
            if len(brightness_categories[category]) < max_per_category:
                brightness_categories[category].append((img_file, img_data))
    
    # Visualize examples from each category
    categories_with_data = {k: v for k, v in brightness_categories.items() if v}
    
    if not categories_with_data:
        print("No categorized images found.")
        return
    
    print(f"\nBrightness category analysis:")
    for category, images in categories_with_data.items():
        print(f"  {category}: {len(images)} example(s)")
    
    # Create visualization
    n_categories = len(categories_with_data)
    n_cols = max_per_category
    
    fig, axes = plt.subplots(n_categories, n_cols, figsize=(n_cols*5, n_categories*4))
    if n_categories == 1:
        axes = axes.reshape(1, -1)
    if n_cols == 1:
        axes = axes.reshape(-1, 1)
    
    for row, (category, images) in enumerate(categories_with_data.items()):
        for col in range(n_cols):
            if col < len(images):
                img_file, img_data = images[col]
                axes[row, col].imshow(img_data['resized'])
                axes[row, col].set_title(f"{category.upper()}\n{img_data['filename']}\nMean: {img_data['stats']['mean_intensity']:.1f}")
            else:
                axes[row, col].text(0.5, 0.5, 'No image', ha='center', va='center',
                                   transform=axes[row, col].transAxes)
            axes[row, col].axis('off')
    
    plt.tight_layout()
    plt.suptitle('Brightness Category Examples', y=1.02, fontsize=16)
    plt.show()

if image_files:
    print("\n" + "=" * 80)
    print("BRIGHTNESS CATEGORY ANALYSIS")
    print("=" * 80)
    analyze_brightness_categories(INPUT_FOLDER)

## 7. Interpretation Guide

### Understanding the Results:

**1. Preprocessing Effects:**
- **Histogram Equalization**: Good for very dark images, but may over-enhance noise
- **CLAHE**: More balanced enhancement, preserves local details
- **Gamma Correction**: Specifically targets dark regions
- **Unsharp Masking**: Enhances edges and fine details
- **Edge Enhancement**: Highlights structural boundaries

**2. Feature Map Insights:**
- **Early layers** (block1, conv1): Detect basic edges and textures
- **Middle layers** (block3, conv3): Detect patterns and shapes
- **Late layers** (block5, conv5): Detect complex object parts

**3. Why Dark Images May Cluster Differently:**
- **Noise sensitivity**: Dark images may have different noise patterns
- **Feature extraction**: CNNs may struggle with low-contrast regions
- **Preprocessing dependency**: Different enhancement methods reveal different features
- **Infrastructure visibility**: Track details may be more/less visible in darkness

**4. Clustering Factors to Consider:**
- **Lighting conditions**: Time of day, artificial lighting
- **Track type**: Ballasted vs embedded tracks
- **Infrastructure age**: Newer vs older installations
- **Environmental factors**: Weather, season, maintenance level
- **Camera settings**: Exposure, ISO, white balance

### Recommendations:
1. **Compare multiple dark images** to identify common patterns
2. **Test different preprocessing** methods in your clustering pipeline
3. **Consider ensemble features** from multiple preprocessing methods
4. **Analyze misclassified examples** to understand failure modes