# Image Preprocessing Pipeline

This notebook handles image preprocessing, augmentation, and preparation for machine learning models.

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
from PIL import Image, ImageEnhance, ImageFilter
from pathlib import Path
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import json
import pickle
from tqdm import tqdm
import shutil

# Set paths
PROJECT_ROOT = Path('/Users/debabratapattnayak/web-dev/greencast')
DATASET_PATH = PROJECT_ROOT / 'dataset'
PLANTVILLAGE_PATH = DATASET_PATH / 'plantvillage dataset'
DATA2_PATH = DATASET_PATH / 'data2'
PROCESSED_DATA_PATH = PROJECT_ROOT / 'processed_data'

# Create processed data directories
PROCESSED_DATA_PATH.mkdir(exist_ok=True)
(PROCESSED_DATA_PATH / 'train').mkdir(exist_ok=True)
(PROCESSED_DATA_PATH / 'validation').mkdir(exist_ok=True)
(PROCESSED_DATA_PATH / 'test').mkdir(exist_ok=True)

## Image Preprocessing Functions

In [None]:
class ImagePreprocessor:
    def __init__(self, target_size=(224, 224), normalize=True):
        self.target_size = target_size
        self.normalize = normalize
        
    def resize_image(self, image, maintain_aspect_ratio=True):
        """Resize image to target size"""
        if maintain_aspect_ratio:
            # Calculate aspect ratio
            h, w = image.shape[:2]
            aspect_ratio = w / h
            
            if aspect_ratio > 1:  # Width > Height
                new_w = self.target_size[0]
                new_h = int(new_w / aspect_ratio)
            else:  # Height >= Width
                new_h = self.target_size[1]
                new_w = int(new_h * aspect_ratio)
            
            # Resize image
            resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_AREA)
            
            # Pad to target size
            delta_w = self.target_size[0] - new_w
            delta_h = self.target_size[1] - new_h
            top, bottom = delta_h // 2, delta_h - (delta_h // 2)
            left, right = delta_w // 2, delta_w - (delta_w // 2)
            
            # Pad with mean color
            mean_color = np.mean(resized, axis=(0, 1))
            padded = cv2.copyMakeBorder(resized, top, bottom, left, right, 
                                      cv2.BORDER_CONSTANT, value=mean_color)
            return padded
        else:
            return cv2.resize(image, self.target_size, interpolation=cv2.INTER_AREA)
    
    def normalize_image(self, image):
        """Normalize image pixel values"""
        if self.normalize:
            return image.astype(np.float32) / 255.0
        return image
    
    def enhance_image(self, image, enhance_contrast=True, enhance_brightness=True):
        """Apply image enhancements"""
        # Convert to PIL for enhancements
        pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
        
        if enhance_contrast:
            enhancer = ImageEnhance.Contrast(pil_image)
            pil_image = enhancer.enhance(1.2)  # Increase contrast by 20%
        
        if enhance_brightness:
            enhancer = ImageEnhance.Brightness(pil_image)
            pil_image = enhancer.enhance(1.1)  # Increase brightness by 10%
        
        # Convert back to OpenCV format
        return cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR)
    
    def remove_noise(self, image):
        """Apply noise reduction"""
        # Apply bilateral filter to reduce noise while preserving edges
        return cv2.bilateralFilter(image, 9, 75, 75)
    
    def preprocess_image(self, image_path, apply_enhancements=True):
        """Complete preprocessing pipeline for a single image"""
        try:
            # Load image
            image = cv2.imread(str(image_path))
            if image is None:
                raise ValueError(f"Could not load image: {image_path}")
            
            # Apply enhancements if requested
            if apply_enhancements:
                image = self.enhance_image(image)
                image = self.remove_noise(image)
            
            # Resize image
            image = self.resize_image(image)
            
            # Convert to RGB
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            
            # Normalize
            image = self.normalize_image(image)
            
            return image
            
        except Exception as e:
            print(f"Error preprocessing {image_path}: {e}")
            return None

# Initialize preprocessor
preprocessor = ImagePreprocessor(target_size=(224, 224))
print("Image preprocessor initialized!")

## Data Augmentation Setup

In [None]:
def create_data_generators():
    """Create data generators for training, validation, and testing"""
    
    # Training data generator with augmentation
    train_datagen = ImageDataGenerator(
        rescale=1./255,
        rotation_range=20,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest',
        brightness_range=[0.8, 1.2],
        channel_shift_range=0.1
    )
    
    # Validation and test data generators (no augmentation)
    val_test_datagen = ImageDataGenerator(rescale=1./255)
    
    return train_datagen, val_test_datagen

def visualize_augmentation(image_path, num_augmented=8):
    """Visualize data augmentation effects"""
    # Load original image
    img = load_img(image_path, target_size=(224, 224))
    x = img_to_array(img)
    x = x.reshape((1,) + x.shape)
    
    # Create augmentation generator
    datagen = ImageDataGenerator(
        rotation_range=20,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest',
        brightness_range=[0.8, 1.2]
    )
    
    # Generate augmented images
    fig, axes = plt.subplots(2, 4, figsize=(16, 8))
    axes = axes.ravel()
    
    # Show original
    axes[0].imshow(img)
    axes[0].set_title('Original')
    axes[0].axis('off')
    
    # Generate and show augmented versions
    i = 1
    for batch in datagen.flow(x, batch_size=1):
        if i >= num_augmented:
            break
        axes[i].imshow(batch[0].astype('uint8'))
        axes[i].set_title(f'Augmented {i}')
        axes[i].axis('off')
        i += 1
    
    plt.suptitle('Data Augmentation Examples')
    plt.tight_layout()
    plt.show()

# Create data generators
train_datagen, val_test_datagen = create_data_generators()
print("Data generators created!")

## Dataset Organization and Splitting

In [None]:
def organize_dataset(source_path, output_path, test_split=0.15, val_split=0.15):
    """Organize dataset into train/val/test splits"""
    
    # Create output directories
    train_dir = output_path / 'train'
    val_dir = output_path / 'validation'
    test_dir = output_path / 'test'
    
    train_dir.mkdir(exist_ok=True)
    val_dir.mkdir(exist_ok=True)
    test_dir.mkdir(exist_ok=True)
    
    # Get all class directories
    class_dirs = [d for d in os.listdir(source_path) 
                  if os.path.isdir(os.path.join(source_path, d))]
    
    dataset_info = {
        'classes': [],
        'train_counts': {},
        'val_counts': {},
        'test_counts': {},
        'total_images': 0
    }
    
    for class_name in tqdm(class_dirs, desc="Processing classes"):
        class_path = os.path.join(source_path, class_name)
        
        # Get all images in this class
        images = [f for f in os.listdir(class_path) 
                 if f.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp'))]
        
        if len(images) < 10:  # Skip classes with too few images
            print(f"Skipping {class_name}: only {len(images)} images")
            continue
        
        # Create class directories in output
        (train_dir / class_name).mkdir(exist_ok=True)
        (val_dir / class_name).mkdir(exist_ok=True)
        (test_dir / class_name).mkdir(exist_ok=True)
        
        # Split images
        np.random.shuffle(images)
        
        n_test = int(len(images) * test_split)
        n_val = int(len(images) * val_split)
        n_train = len(images) - n_test - n_val
        
        train_images = images[:n_train]
        val_images = images[n_train:n_train + n_val]
        test_images = images[n_train + n_val:]
        
        # Copy images to respective directories
        for img in train_images:
            src = os.path.join(class_path, img)
            dst = train_dir / class_name / img
            shutil.copy2(src, dst)
        
        for img in val_images:
            src = os.path.join(class_path, img)
            dst = val_dir / class_name / img
            shutil.copy2(src, dst)
        
        for img in test_images:
            src = os.path.join(class_path, img)
            dst = test_dir / class_name / img
            shutil.copy2(src, dst)
        
        # Update dataset info
        dataset_info['classes'].append(class_name)
        dataset_info['train_counts'][class_name] = len(train_images)
        dataset_info['val_counts'][class_name] = len(val_images)
        dataset_info['test_counts'][class_name] = len(test_images)
        dataset_info['total_images'] += len(images)
    
    # Save dataset info
    with open(output_path / 'dataset_info.json', 'w') as f:
        json.dump(dataset_info, f, indent=2)
    
    print(f"\nDataset organization complete!")
    print(f"Total classes: {len(dataset_info['classes'])}")
    print(f"Total images: {dataset_info['total_images']}")
    print(f"Train images: {sum(dataset_info['train_counts'].values())}")
    print(f"Validation images: {sum(dataset_info['val_counts'].values())}")
    print(f"Test images: {sum(dataset_info['test_counts'].values())}")
    
    return dataset_info

# Organize PlantVillage color dataset
plantvillage_color_path = PLANTVILLAGE_PATH / 'color'
if plantvillage_color_path.exists():
    print("Organizing PlantVillage color dataset...")
    pv_dataset_info = organize_dataset(
        plantvillage_color_path, 
        PROCESSED_DATA_PATH / 'plantvillage_color'
    )
else:
    print("PlantVillage color dataset not found!")