🎯 Computer Vision Project - Complete Pipeline

## 📋 Table of Contents
1. [Project Overview](#Project-overview)
2. [SETUP & IMPORTS](#SETUP-&-IMPORTS)
3. [Data Engineering Pipeline](#data-engineering-pipeline)
4. [Exploratory Data Analysis](#Exploratory-data-analysis)
5. [Model Building & Training](#Model-building--training)
6. [Model Evaluation](Model-evaluation)
7. [EXPERIMENT WORKFLOW](#EXPERIMENT-WORKFLOW)

# Project Overview

This notebook implements a complete computer vision pipeline for image classification using transfer learning. The pipeline includes:

- 📊 Data augmentation and preprocessing
- 🔍 Exploratory data analysis
- 🤖 Multiple model training (VGG16, VGG19, InceptionV3)
- 📈 Comprehensive model evaluation
- 📊 Performance comparison and visualization

**Key Features:**
- Automated data splitting (Train/Validation/Test)
- Multiple data augmentation techniques
- Transfer learning 
- Comprehensive evaluation metrics
- Production-ready pipeline structure

# SETUP & IMPORTS

In [None]:
import os
import shutil
import cv2
import gc
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tqdm import tqdm
from itertools import cycle
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import Model, Input, regularizers
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau, TensorBoard
from tensorflow.keras.applications import VGG16, VGG19, InceptionV3
from tensorflow.keras.applications.vgg16 import preprocess_input as vgg16_preprocess
from tensorflow.keras.applications.vgg19 import preprocess_input as vgg19_preprocess
from tensorflow.keras.applications.inception_v3 import preprocess_input as inception_preprocess

# data engineering pipeline

In [None]:
def create_dataset_pipeline(source_dir, base_dir, image_size=(224, 224), val_ratio=0.20):
    """
    Augments train & test images separately.
    Splits only the train set into train and validation sets.
    No duplicate augmented dataset stored — only final train, val, and test folders.
    Shows progress with tqdm.
    """
    print("Starting Data Engineering Pipeline...")

    # Final directories
    train_dir = os.path.join(base_dir, 'train')
    validation_dir = os.path.join(base_dir, 'validation')
    test_dir = os.path.join(base_dir, 'test')

    # Remove old output
    if os.path.exists(base_dir):
        print(f"Removing old base directory: {base_dir}")
        shutil.rmtree(base_dir)
    os.makedirs(train_dir, exist_ok=True)
    os.makedirs(validation_dir, exist_ok=True)
    os.makedirs(test_dir, exist_ok=True)

    def augment_images(src_dir):
        """Resize + rotate + flip images and return augmented images in memory."""
        class_labels = [d for d in os.listdir(src_dir) if os.path.isdir(os.path.join(src_dir, d))]
        all_files = []
        all_labels = []
        total_original = 0
        total_augmented = 0

        print(f"Processing directory: {src_dir}")
        for class_name in class_labels:
            src_class_dir = os.path.join(src_dir, class_name)
            files = os.listdir(src_class_dir)
            total_original += len(files)

            for filename in tqdm(files, desc=f"Augmenting {class_name}", unit="img"):
                img_path = os.path.join(src_class_dir, filename)
                img = cv2.imread(img_path)
                if img is None:
                    continue

                resized = cv2.resize(img, (image_size[1], image_size[0]))
                rotated = cv2.rotate(resized, cv2.ROTATE_90_CLOCKWISE)
                flipped = cv2.flip(resized, 0)

                base_name, ext = os.path.splitext(filename)
                aug_set = [
                    (f"{base_name}_orig{ext}", resized),
                    (f"{base_name}_rot{ext}", rotated),
                    (f"{base_name}_flip{ext}", flipped)
                ]

                for new_name, new_img in aug_set:
                    all_files.append((class_name, new_name, new_img))
                    all_labels.append(class_name)
                    total_augmented += 1

        print(f"Original images: {total_original}")
        print(f"Augmented images (including original resized): {total_augmented}")
        return all_files, all_labels, class_labels

    # Augment TRAIN
    print("\n=== Augmenting TRAIN set ===")
    train_files_aug, train_labels_aug, class_labels = augment_images(os.path.join(source_dir, 'train'))

    # Split augmented train into train/validation
    print("\n=== Splitting TRAIN set into train/validation ===")
    train_files_list, val_files_list, train_labels_list, val_labels_list = train_test_split(
        train_files_aug, train_labels_aug,
        test_size=val_ratio,
        random_state=42,
        stratify=train_labels_aug
    )

    def save_images(file_tuples, dest_dir):
        for cls, fname, img in tqdm(file_tuples, desc=f"Saving to {os.path.basename(dest_dir)}", unit="img"):
            cls_dir = os.path.join(dest_dir, cls)
            os.makedirs(cls_dir, exist_ok=True)
            cv2.imwrite(os.path.join(cls_dir, fname), img)

    # Save final TRAIN and VALIDATION sets
    save_images(train_files_list, train_dir)
    save_images(val_files_list, validation_dir)

    # Augment TEST
    print("\n=== Augmenting TEST set ===")
    test_files_aug, _, _ = augment_images(os.path.join(source_dir, 'test'))
    save_images(test_files_aug, test_dir)

    # Final stats
    print("\nPipeline finished successfully.")
    print(f"Final TRAIN images: {len(train_files_list)}")
    print(f"Final VALIDATION images: {len(val_files_list)}")
    print(f"Final TEST images: {len(test_files_aug)}")

    return {
        'train_dir': train_dir,
        'validation_dir': validation_dir,
        'test_dir': test_dir,
        'class_labels': class_labels
    }


 # Exploratory Data Analysis

In [None]:
def visualize_dataset_distribution(dataset_dir, result_dir=None, data_type=None):
    """
    Given a dataset directory containing subfolders like:
        train/
            class1/
            class2/
        test/
            class1/
            class2/
        validation/ (optional)
            class1/
            class2/
    
    This function will automatically detect the splits and visualize
    the class distribution in each subset.
    
    Parameters:
    - dataset_dir: Path to the dataset directory
    - result_dir: Directory to save the visualization (optional)
    - data_type: Dataset type identifier for filename (e.g., 'original', 'preprocessed') (optional)
    """
    
    def count_files(directory):
        """Count number of images in each class folder."""
        if directory is None or not os.path.isdir(directory):
            return {}    
        counts = {}
        for class_name in sorted(os.listdir(directory)):
            class_path = os.path.join(directory, class_name)
            if os.path.isdir(class_path):
                counts[class_name] = len(os.listdir(class_path))
        return counts 

    # Detect splits
    splits = {}
    for split_name in ["train", "test", "validation", "val"]:
        split_path = os.path.join(dataset_dir, split_name)
        if os.path.isdir(split_path):
            splits[split_name] = split_path 


    if not splits:
        print(f"No train/test/validation folders found inside {dataset_dir}")
        return

    # Collect counts
    all_classes = set()
    data = []     
    for split_name, split_path in splits.items():
        counts = count_files(split_path)
        all_classes.update(counts.keys())
        for cls, cnt in counts.items():
            data.append({"Class": cls, "Count": cnt, "Set": split_name.capitalize()})

    # Fill missing classes with zero counts
    for split_name in splits.keys():
        for cls in all_classes:
            if not any(d["Class"] == cls and d["Set"] == split_name.capitalize() for d in data):
                data.append({"Class": cls, "Count": 0, "Set": split_name.capitalize()})

    # Create DataFrame
    df = pd.DataFrame(data)

    # Plot
    plt.figure(figsize=(14, 7))
    ax = sns.barplot(x="Class", y="Count", hue="Set", data=df)
    for container in ax.containers:
        ax.bar_label(container)
    plt.title("Dataset Class Distribution by Split")
    plt.ylabel("Number of Images")
    plt.xlabel("Class")
    plt.xticks(rotation=45)
    plt.tight_layout()
    
    # Save plot if result_dir is provided
    if result_dir is not None:
        # Create result directory if it doesn't exist
        os.makedirs(result_dir, exist_ok=True)
        
        # Generate filename components
        splits_str = "_".join(sorted(splits.keys()))
        data_type_str = f"_{data_type}" if data_type else ""
        filename = f"{splits_str}_distribution{data_type_str}.png"
        save_path = os.path.join(result_dir, filename)
        
        # Save the plot
        plt.savefig(save_path)
        print(f"Visualization saved to: {save_path}")
    
    plt.show()


    #plot training history
    
def plot_training_history(history, model_name, save_dir):
    """Plot training and validation accuracy/loss and save the figure"""
    plt.figure(figsize=(12, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.title(f'Training and Validation Accuracy - {model_name}')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title(f'Training and Validation Loss - {model_name}')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, f'training_history_{model_name}.png')
    plt.savefig(save_path, dpi=300)
    plt.close()
    print(f"Training history plot saved to {save_path}")

# Model Building & Training

In [None]:

 # Create model configration from where we extract the model and it
model_configs = {
    "vgg16": {
        "model_class": VGG16,
        "preprocess_fn": vgg16_preprocess,
        "input_shape": (224, 224, 3)
    },
    "vgg19": {
        "model_class": VGG19,
        "preprocess_fn": vgg19_preprocess,
        "input_shape": (224, 224, 3)
    },
    "inceptionv3": {
        "model_class": InceptionV3,
        "preprocess_fn": inception_preprocess,
        "input_shape": (299, 299, 3)  # Inception requires larger input
    }
}



def setup_datagenerator(train_dir, val_dir, test_dir, preprocess_fn, batch_size=16, image_size=(224, 224)):
    """
    Create and configure data generators for the model.
    
    Parameters:
    - train_dir: Path to training data directory
    - val_dir: Path to validation data directory
    - test_dir: Path to test data directory
    - preprocess_fn: Preprocessing function to apply to images
    - batch_size: Batch size for generators (default: 32)
    - image_size: Target image size as (height, width) (default: (224, 224))
    
    Returns:
    - train_generator: Training data generator
    - validation_generator: Validation data generator
    - test_generator: Test data generator
    """
    # Create data generators with appropriate preprocessing
    train_datagen = ImageDataGenerator(
        preprocessing_function=preprocess_fn
    )
    
    # Validation and test generators don't need rescaling if preprocess_fn handles it
    val_datagen = ImageDataGenerator(preprocessing_function=preprocess_fn)
    test_datagen = ImageDataGenerator(preprocessing_function=preprocess_fn)
    
    # Create generators
    train_generator = train_datagen.flow_from_directory(
        train_dir,
        target_size=image_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle=True
    )
    
    validation_generator = val_datagen.flow_from_directory(
        val_dir,
        target_size=image_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle=False
    )
    
    test_generator = test_datagen.flow_from_directory(
        test_dir,
        target_size=image_size,
        batch_size=batch_size,
        class_mode='categorical',
        shuffle=False
    )
    
    return train_generator, validation_generator, test_generator

 # Build the model  
def build_model(base_model_class, preprocess_fn, num_classes, input_shape=(224, 224, 3)):
    """Create a transfer learning model with the given base model"""
    # Create the base model
    base_model = base_model_class(
        include_top=False, 
        weights='imagenet',
        input_shape=input_shape
    )
    
    # Freeze the base model
    base_model.trainable = False
    

    inputs = Input(shape=input_shape)
    x = preprocess_fn(inputs)
    x = base_model(x, training=False) # Keep False to freeze base model initially

# Feature extraction and regularization head
    x = GlobalAveragePooling2D()(x)

# Add BatchNormalization to stabilize and accelerate training
    x = BatchNormalization()(x)

# Use a smaller, regularized Dense layer
    x = Dense(256, activation='relu', # 256 is often sufficient
                 kernel_regularizer=regularizers.l2(1e-4))(x) # Small L2 penalty
    x = Dropout(0.5)(x)

# Optional: Second smaller Dense layer for more capacity
    x = Dense(128, activation='relu',
                 kernel_regularizer=regularizers.l2(1e-4))(x)
    x = Dropout(0.3)(x) # Slightly lower dropout

# Final output layer
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs, outputs)
    
    
    model.compile(
        optimizer=Adam(learning_rate=0.001),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    return model


# Train the model 
def train_model(model_name, config, train_gen, val_gen,num_classes, epochs=50):
    """Train the specified model using provided generators"""
    print(f"\nTraining model {model_name}.....")
    
    # Build the model
    model = build_model(
        config['model_class'],
        config['preprocess_fn'],
        num_classes,
        config['input_shape']
    )
    
    # Set up callbacks
    checkpoint = ModelCheckpoint(
        f'{model_name}_best_model.keras',
        monitor='val_accuracy',
        save_best_only=True,
        mode='max',
        verbose=1
    )
    
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=12,
        restore_best_weights=True,
        verbose=1
    )
    
    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=5,
        min_lr=1e-6,
        verbose=1
    )
    
    tensorboard = TensorBoard(log_dir=f'logs/experiment_N1/{model_name}')
    
    # Train the model
    history = model.fit(
        train_gen,
        validation_data=val_gen,
        epochs=epochs,
        callbacks=[checkpoint, early_stopping, reduce_lr, tensorboard],
        verbose=1
    )
    
    return model, history

 # Model Evaluation

In [None]:
def evaluate_model(model, test_generator, model_name, class_names, save_dir):
    """Evaluate the model on test data and generate metrics and visualizations"""
    # Get predictions
    y_pred_probs = model.predict(test_generator)
    y_pred = np.argmax(y_pred_probs, axis=1)
    y_true = test_generator.classes
    
    # Calculate test accuracy
    test_accuracy = np.mean(y_pred == y_true)
    print(f"Test Accuracy for {model_name}: {test_accuracy:.4f}")
    
    # Classification report
    report = classification_report(y_true, y_pred, target_names=class_names)
    print(f"Classification Report for {model_name}:\n{report}")
     # ✅ Save classification report
    os.makedirs(save_dir, exist_ok=True)
    report_path = os.path.join(save_dir, f"classification_report_{model_name}.txt")
    with open(report_path, "w") as f:
        f.write(report)
    print(f"Classification report saved to {report_path}")
    
    # Confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title(f'Confusion Matrix - {model_name}')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.tight_layout()
    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, f'confusion_matrix_{model_name}.png')
    plt.savefig(save_path, dpi=300)
    plt.close()
    print(f"Confusion matrix saved to {save_path}")
    
    # ROC Curve (for binary or multi-class classification)
    if len(class_names) == 2:
        # Binary classification
        fpr, tpr, _ = roc_curve(y_true, y_pred_probs[:, 1])
        roc_auc = auc(fpr, tpr)
        
        plt.figure(figsize=(8, 6))
        plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
        plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title(f'ROC Curve - {model_name}')
        plt.legend(loc="lower right")
        plt.tight_layout()
        os.makedirs(save_dir, exist_ok=True)
        save_path = os.path.join(save_dir, f'roc_curve_{model_name}.png')
        plt.savefig(save_path, dpi=300)
        plt.close()
        print(f"ROC curve saved to {save_path}")
    else:
        # Multi-class classification
        # Binarize the output
        y_true_bin = tf.keras.utils.to_categorical(y_true, len(class_names))
        
        # Compute ROC curve and ROC area for each class
        fpr = dict()
        tpr = dict()
        roc_auc = dict()
        for i in range(len(class_names)):
            fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_pred_probs[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])
        
        # Compute micro-average ROC curve and ROC area
        fpr["micro"], tpr["micro"], _ = roc_curve(y_true_bin.ravel(), y_pred_probs.ravel())
        roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
        
        # Plot ROC curves
        plt.figure(figsize=(10, 8))
        plt.plot(fpr["micro"], tpr["micro"],
                 label=f'micro-average ROC curve (area = {roc_auc["micro"]:.2f})',
                 color='deeppink', linestyle=':', linewidth=4)
        
        colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'green', 'red', 'purple', 'pink', 'brown', 'gray', 'olive'])
        for i, color in zip(range(len(class_names)), colors):
            plt.plot(fpr[i], tpr[i], color=color, lw=2,
                     label=f'ROC curve of class {class_names[i]} (area = {roc_auc[i]:.2f})')
        
        plt.plot([0, 1], [0, 1], 'k--', lw=2)
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title(f'ROC Curve - {model_name}')
        plt.legend(loc="lower right")
        plt.tight_layout()
        os.makedirs(save_dir, exist_ok=True)
        save_path = os.path.join(save_dir, f'roc_curve_{model_name}.png')
        plt.savefig(save_path, dpi=300)
        plt.close()
        print(f"ROC curve saved to {save_path}")
    
    return test_accuracy, report

# Compare the models
def plot_model_comparison(model_scores, save_dir):
    """Plot a comparison of model accuracies"""
    plt.figure(figsize=(10, 6))
    models = list(model_scores.keys())
    scores = list(model_scores.values())
    
    bars = plt.bar(models, scores, color=['blue', 'green', 'red', 'purple', 'orange'])
    
    # Add value labels on top of each bar
    for bar in bars:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2., height,
                 f'{height:.4f}',
                 ha='center', va='bottom')
    
    plt.title('Model Accuracy Comparison')
    plt.xlabel('Model')
    plt.ylabel('Accuracy')
    plt.ylim(0, 1)  # Assuming accuracy is between 0 and 1
    plt.tight_layout()
    os.makedirs(save_dir, exist_ok=True)
    save_path = os.path.join(save_dir, 'model_comparison.png')
    plt.savefig(save_path, dpi=300)
    plt.close()
    print(f"Model comparison plot saved to {save_path}")

# EXPERIMENT WORKFLOW

In [None]:
# Define parameters
source_dir = "/content/drive/MyDrive/MRI_Orignal Data"
base_dir = "/content/drive/MyDrive/my_project"
result_dir = "/content/drive/MyDrive/my_project/results"
val_ratio = 0.10
batch_size = 16
num_classes = 4


# Step 1: Visualize Original Dataset
visualize_dataset_distribution(source_dir, result_dir, 'original')


# Step 2: Run Dataset Pipeline
pipeline_result = create_dataset_pipeline(source_dir, base_dir, image_size=(224, 224), val_ratio=val_ratio)
train_dir, val_dir, test_dir = pipeline_result['train_dir'], pipeline_result['validation_dir'], pipeline_result['test_dir']


# Step 3: Visualize Processed Dataset
visualize_dataset_distribution(base_dir, result_dir, 'preprocessed')


# Step 4: Train and Evaluate Models
model_scores = {}
for model_name, config in model_configs.items():
    
    print(f"\n{'='*50}\nTraining {model_name}\n{'='*50}")
    image_size = config['input_shape'][:2]
    train_gen, val_gen, test_gen = setup_datagenerator(train_dir, val_dir, test_dir, config['preprocess_fn'], batch_size, image_size)
    model, history = train_model(model_name, config, train_gen, val_gen, num_classes, epochs=100)
    plot_training_history(history, model_name, result_dir)
    class_names = list(train_gen.class_indices.keys())
    test_accuracy, report = evaluate_model(model, test_gen, model_name, class_names, result_dir)
    model_scores[model_name] = test_accuracy
    print(report)
    tf.keras.backend.clear_session()
    gc.collect()

# Step 5: Compare Models
plot_model_comparison(model_scores, result_dir)
print("\nPipeline completed successfully!")