
  

# config.py

In [None]:
import os
import yaml
from pathlib import Path


class TrainingConfig:
    """Configuration class for training parameters"""

    def __init__(self):
        # Paths
        self.BASE_DIR = Path(os.path.dirname(os.path.abspath(__file__))).parent
        self.DATA_DIR = self.BASE_DIR / "data"
        self.MODELS_DIR = self.BASE_DIR / "models"

        # Data paths
        self.TRAIN_LABELS_PATH = self.DATA_DIR / "CIFAR-10_Train_Labels.csv"
        self.TRAIN_IMAGES_DIR = self.DATA_DIR / "train"
        self.TEST_IMAGES_DIR = self.DATA_DIR / "test"
        self.AUGMENTED_TRAIN_DIR = self.DATA_DIR / "augmented_train"

        # Training parameters
        self.BATCH_SIZE = 32
        self.LEARNING_RATE = 0.001
        self.EPOCHS = 5
        self.NUM_CLASSES = 10
        self.VALIDATION_SPLIT = 0.2

        # Data augmentation
        self.AUGMENTATIONS_PER_IMAGE = 4
        self.SAMPLE_SIZE = 100

        # Model parameters
        self.DROPOUT_RATE = 0.3
        self.L1_FACTOR = 0.0001
        self.L2_FACTOR = 0.01

        # Early stopping
        self.PATIENCE = 5
        self.MIN_DELTA = 0.001

        # Create necessary directories
        self.create_directories()

    def create_directories(self):
        """Create necessary directories"""
        os.makedirs(self.DATA_DIR, exist_ok=True)
        os.makedirs(self.MODELS_DIR, exist_ok=True)
        os.makedirs(self.AUGMENTED_TRAIN_DIR, exist_ok=True)

    def save_config(self, path):
        """Save configuration to YAML file"""
        config_dict = {k: str(v) if isinstance(v, Path) else v
                       for k, v in vars(self).items()
                       if not k.startswith('_')}

        with open(path, 'w') as f:
            yaml.dump(config_dict, f, default_flow_style=False)

    @classmethod
    def load_config(cls, path):
        """Load configuration from YAML file"""
        with open(path, 'r') as f:
            config_dict = yaml.safe_load(f)

        config = cls()
        for k, v in config_dict.items():
            if hasattr(config, k):
                if isinstance(getattr(config, k), Path):
                    setattr(config, k, Path(v))
                else:
                    setattr(config, k, v)
        return config


# preprocessing.py

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import numpy as np
from collections import Counter
import random

class CSVGenerator:
    def script_to_generate_csv(self,fileName,base_dir):
        # Define the directory for the augmented_train dataset
        # base_dir = "../data/augmented_train"

        # Create a list to hold the rows of the CSV
        data_rows = []

        # Walk through the directories
        for label in os.listdir(base_dir):
            label_path = os.path.join(base_dir, label)
            if os.path.isdir(label_path):  # Check if it's a directory
                for image_name in os.listdir(label_path):
                    image_path = os.path.join(label, image_name)  # Relative path for the image
                    data_rows.append([image_path, label])

        # Create a DataFrame
        df = pd.DataFrame(data_rows, columns=["image_path", "label"])

        # Shuffle the DataFrame rows
        df = df.sample(frac=1).reset_index(drop=True)

        # Add an ID column
        df.insert(0, "id", range(1, len(df) + 1))

        # Save the DataFrame to a CSV file
        output_csv_path = f"../data/{fileName}.csv"
        df.to_csv(output_csv_path, index=False)

        print(f"CSV file has been saved at: {output_csv_path}")




class CIFAR10PreProcessor:
    def __init__(self, train_labels_path, train_images_dir, test_images_dir):
        """
        Initialize the CIFAR10 data processor

        Args:
            train_labels_path: Path to training labels CSV
            train_images_dir: Directory containing training images
            test_images_dir: Directory containing test images
        """
        self.train_labels_path = train_labels_path
        self.train_images_dir = train_images_dir
        self.test_images_dir = test_images_dir
        self.train_labels = None
        self.load_and_analyze_dataset()


    def load_and_analyze_dataset(self):
        """Load and analyze the dataset, print basic statistics"""
        # Load labels
        self.train_labels = pd.read_csv(self.train_labels_path)

        # Convert id column to string and add .png extension
        self.train_labels['id'] = self.train_labels['id'].astype(str) + '.png'

        # Print basic statistics
        print("Dataset Overview:")
        print(f"Number of training samples: {len(self.train_labels)}")
        print(f"Number of training images: {len(os.listdir(self.train_images_dir))}")
        print(f"Number of testing images: {len(os.listdir(self.test_images_dir))}")

        # Class distribution
        class_dist = Counter(self.train_labels['label'])
        print("\nClass Distribution:")
        for label, count in class_dist.items():
            print(f"{label}: {count} images ({count / len(self.train_labels) * 100:.2f}%)")

    def normalize_image(self, img):
        """Normalize image pixel values to range [0,1]"""
        return img.astype('float32') / 255.0

    def augment_image(self, img, rotation=True, flip=True, brightness=True, zoom=True):
        """Apply various augmentation techniques to an image"""
        augmented = img.copy()

        if rotation:
            angle = np.random.uniform(-15, 15)
            height, width = img.shape[:2]
            center = (width / 2, height / 2)
            rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
            augmented = cv2.warpAffine(augmented, rotation_matrix, (width, height))

        if flip and np.random.random() > 0.5:
            augmented = cv2.flip(augmented, 1)

        if brightness:
            beta = np.random.uniform(-30, 30)
            augmented = cv2.convertScaleAbs(augmented, beta=beta)

        if zoom:
            scale = np.random.uniform(0.8, 1.2)
            height, width = img.shape[:2]
            center = (width / 2, height / 2)
            zoom_matrix = cv2.getRotationMatrix2D(center, 0, scale)
            augmented = cv2.warpAffine(augmented, zoom_matrix, (width, height))

        return augmented

    def load_and_display_images(self, num_samples=5, random_state=None, normalize=True):
        """Display sample images from the dataset"""
        if random_state is not None:
            samples = self.train_labels.sample(n=num_samples, random_state=random_state)
        else:
            samples = self.train_labels.head(num_samples)

        fig, axes = plt.subplots(1, num_samples, figsize=(15, 5))

        for i in range(num_samples):
            img_name = samples.iloc[i]['id']
            label = samples.iloc[i]['label']
            img_path = os.path.join(self.train_images_dir, img_name)

            try:
                img = cv2.imread(img_path)
                if img is None:
                    raise ValueError(f"Failed to load image: {img_path}")
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

                orig_img = img.copy()
                if normalize:
                    img = self.normalize_image(img)

                brightness = np.mean(cv2.cvtColor(orig_img, cv2.COLOR_RGB2GRAY))

                axes[i].imshow(img)
                axes[i].set_title(f"Label: {label}\nBrightness: {brightness:.1f}")
                axes[i].axis("off")

            except Exception as e:
                print(f"Error loading image {img_name}: {str(e)}")
                axes[i].text(0.5, 0.5, "Error loading image", ha='center')
                axes[i].axis("off")

        plt.tight_layout()
        plt.show()
        return fig

    def analyze_image_properties(self, sample_size=100, normalize=True):
        """Analyze properties of images in the dataset"""
        samples = self.train_labels.sample(n=min(sample_size, len(self.train_labels)))

        sizes = []
        brightness_values = []
        pixel_value_ranges = []
        corrupted = 0

        for _, row in samples.iterrows():
            img_path = os.path.join(self.train_images_dir, row['id'])
            try:
                img = cv2.imread(img_path)
                if img is None:
                    corrupted += 1
                    continue

                orig_img = img.copy()
                if normalize:
                    img = self.normalize_image(img)

                sizes.append(img.shape)
                brightness = np.mean(cv2.cvtColor(orig_img, cv2.COLOR_RGB2GRAY))
                brightness_values.append(brightness)

                pixel_value_ranges.append({
                    'min': img.min(),
                    'max': img.max(),
                    'mean': img.mean()
                })

            except Exception:
                corrupted += 1

        self._print_analysis_results(sample_size, corrupted, sizes, brightness_values,
                                     pixel_value_ranges, normalize)

        return sizes, brightness_values, pixel_value_ranges

    def _print_analysis_results(self, sample_size, corrupted, sizes, brightness_values,
                                pixel_value_ranges, normalize):
        """Helper method to print analysis results"""
        print("\nImage Analysis:")
        print(f"Sample size: {sample_size}")
        print(f"Corrupted images: {corrupted}")
        print(f"Unique image sizes: {set(sizes)}")
        print(f"Average brightness (original): {np.mean(brightness_values):.2f}")

        if normalize:
            pixel_stats = pd.DataFrame(pixel_value_ranges)
            print("\nNormalized Pixel Value Statistics:")
            print(f"Min: {pixel_stats['min'].mean():.3f}")
            print(f"Max: {pixel_stats['max'].mean():.3f}")
            print(f"Mean: {pixel_stats['mean'].mean():.3f}")

    def display_augmentations(self, img_index=0, num_augmentations=5):
        """Display original and augmented versions of an image"""
        img_path = os.path.join(self.train_images_dir, self.train_labels.iloc[img_index]['id'])
        img = cv2.imread(img_path)

        fig, axes = plt.subplots(1, num_augmentations + 1, figsize=(20, 4))

        axes[0].imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
        axes[0].set_title('Original')
        axes[0].axis('off')

        for i in range(num_augmentations):
            augmented = self.augment_image(img)
            axes[i + 1].imshow(cv2.cvtColor(augmented, cv2.COLOR_BGR2RGB))
            axes[i + 1].set_title(f'Augmented {i + 1}')
            axes[i + 1].axis('off')

        plt.tight_layout()
        plt.show()

    def create_augmented_dataset(self, subset_size=None, augmentations_per_image=3):
        """Create augmented dataset"""
        if subset_size:
            labels_df = self.train_labels.head(subset_size)
        else:
            labels_df = self.train_labels

        augmented_dataset = []

        for idx, row in labels_df.iterrows():
            img_path = os.path.join(self.train_images_dir, row['id'])
            try:
                img = cv2.imread(img_path)
                if img is None:
                    continue

                augmented_dataset.append((img, row['label']))

                for _ in range(augmentations_per_image):
                    aug_img = self.augment_image(img)
                    augmented_dataset.append((aug_img, row['label']))

            except Exception as e:
                print(f"Error processing image {row['id']}: {str(e)}")
                continue

            if idx % 1000 == 0:
                print(f"Processed {idx} images...")

        print(f"\nTotal dataset size after augmentation: {len(augmented_dataset)}")
        return augmented_dataset

    def create_and_save_augmented_images(self, augmented_dir="../data/augmented_train", augmentations_per_image=4):
        """
        Create and save augmented versions of all images in a new directory

        Args:
            augmented_dir: Directory where augmented images will be saved
            augmentations_per_image: Number of augmented versions to create per original image
        """
        # Create augmented directory if it doesn't exist
        os.makedirs(augmented_dir, exist_ok=True)

        total_images = len(self.train_labels)

        for idx, row in self.train_labels.iterrows():
            # Create subdirectory for each class
            class_dir = os.path.join(augmented_dir, row['label'])
            os.makedirs(class_dir, exist_ok=True)

            img_path = os.path.join(self.train_images_dir, row['id'])
            try:
                # Load original image
                img = cv2.imread(img_path)
                if img is None:
                    continue

                # Save original image
                original_filename = f"original_{row['id']}"
                cv2.imwrite(os.path.join(class_dir, original_filename), img)

                # Create and save augmented versions
                for aug_idx in range(augmentations_per_image):
                    aug_img = self.augment_image(img)
                    aug_filename = f"aug{aug_idx + 1}_{row['id']}"
                    cv2.imwrite(os.path.join(class_dir, aug_filename), aug_img)

            except Exception as e:
                print(f"Error processing image {row['id']}: {str(e)}")
                continue

            # Print progress every 1000 images
            if idx % 1000 == 0:
                progress = (idx / total_images) * 100
                print(f"Processed {idx}/{total_images} images ({progress:.2f}%)...")

        print("\nAugmentation complete!")
        print(f"Images saved in: {augmented_dir}")

        # Print directory structure summary
        class_counts = {}
        for class_name in os.listdir(augmented_dir):
            class_path = os.path.join(augmented_dir, class_name)
            if os.path.isdir(class_path):
                num_images = len(os.listdir(class_path))
                class_counts[class_name] = num_images

        print("\nAugmented Dataset Summary:")
        for class_name, count in class_counts.items():
            print(f"{class_name}: {count} images")

    def prepare_training_dataset(self, output_dir="../data/final_train_dataset"):
        """
        Prepare a combined training dataset from original and augmented images

        Args:
            output_dir: Directory where the final training dataset will be created

        Returns:
            DataFrame containing paths and labels for the new training dataset
        """
        # Create output directory
        os.makedirs(output_dir, exist_ok=True)

        # Initialize list to store image information
        dataset_info = []

        # Process original training images
        print("Processing original training images...")
        for idx, row in self.train_labels.iterrows():
            try:
                # Load and process original image
                orig_path = os.path.join(self.train_images_dir, row['id'])
                if os.path.exists(orig_path):
                    # Create class directory if it doesn't exist
                    class_dir = os.path.join(output_dir, row['label'])
                    os.makedirs(class_dir, exist_ok=True)

                    # Copy original image with new name
                    new_filename = f"orig_{row['id']}"
                    new_path = os.path.join(class_dir, new_filename)
                    img = cv2.imread(orig_path)
                    if img is not None:
                        cv2.imwrite(new_path, img)
                        dataset_info.append({
                            'path': new_path,
                            'label': row['label'],
                            'type': 'original'
                        })
            except Exception as e:
                print(f"Error processing original image {row['id']}: {str(e)}")
                continue

            if idx % 1000 == 0:
                print(f"Processed {idx} original images...")

        # Process augmented images
        print("\nProcessing augmented images...")
        augmented_dir = "../data/augmented_train"

        for class_name in os.listdir(augmented_dir):
            class_path = os.path.join(augmented_dir, class_name)
            if not os.path.isdir(class_path):
                continue

            # Create class directory in output
            output_class_dir = os.path.join(output_dir, class_name)
            os.makedirs(output_class_dir, exist_ok=True)

            # Process augmented images for this class
            for img_name in os.listdir(class_path):
                if img_name.startswith('aug'):  # Only process augmented images
                    try:
                        aug_path = os.path.join(class_path, img_name)
                        new_path = os.path.join(output_class_dir, img_name)

                        img = cv2.imread(aug_path)
                        if img is not None:
                            cv2.imwrite(new_path, img)
                            dataset_info.append({
                                'path': new_path,
                                'label': class_name,
                                'type': 'augmented'
                            })
                    except Exception as e:
                        print(f"Error processing augmented image {img_name}: {str(e)}")
                        continue

        # Create DataFrame with dataset information
        dataset_df = pd.DataFrame(dataset_info)

        # Save dataset information to CSV
        csv_path = os.path.join(output_dir, 'training_dataset_info.csv')
        dataset_df.to_csv(csv_path, index=False)

        # Print summary statistics
        print("\nDataset Summary:")
        print(f"Total images: {len(dataset_df)}")
        print("\nImages per class:")
        class_counts = dataset_df['label'].value_counts()
        for label, count in class_counts.items():
            print(f"{label}: {count}")

        print("\nImages by type:")
        type_counts = dataset_df['type'].value_counts()
        for type_name, count in type_counts.items():
            print(f"{type_name}: {count}")

        print(f"\nDataset information saved to: {csv_path}")

        return dataset_df

# dataset.py

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import pandas as pd
from PIL import Image
import os
import numpy as np
from tqdm import tqdm
from modelRegularization import ModelRegularization, apply_regularization


class CIFAR10Dataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        """
        Custom Dataset for CIFAR-10 images.
        
        Args:
            csv_file: Path to the csv file with annotations
            img_dir: Directory with all the images
            transform: Optional transform to be applied on a sample
        """
        self.data_frame = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.class_to_idx = {
            'airplane': 0, 'automobile': 1, 'bird': 2, 'cat': 3, 'deer': 4,
            'dog': 5, 'frog': 6, 'horse': 7, 'ship': 8, 'truck': 9
        }

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(
            self.img_dir, self.data_frame.iloc[idx]['image_path'])
        image = Image.open(img_name).convert('RGB')
        label = self.class_to_idx[self.data_frame.iloc[idx]['label']]

        if self.transform:
            image = self.transform(image)

        return image, label


# model.py

In [None]:
import torch.nn as nn


class CIFAR10Model(nn.Module):
    def __init__(self, num_classes=10, conv_filters=[32, 64, 128], dropout_rate=0.3, fc_units=512):
        super(CIFAR10Model, self).__init__()

        self.conv1 = self._make_conv_block(3, conv_filters[0], dropout_rate)
        self.conv2 = self._make_conv_block(
            conv_filters[0], conv_filters[1], dropout_rate)
        self.conv3 = self._make_conv_block(
            conv_filters[1], conv_filters[2], dropout_rate)

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(conv_filters[2] * 4 * 4, fc_units),
            nn.BatchNorm1d(fc_units),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(fc_units, num_classes)
        )

    def _make_conv_block(self, in_channels, out_channels, dropout_rate):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(dropout_rate)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.fc(x)
        return x


# modelRegularization.py

In [None]:
import torch
import torch.nn as nn
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt

class ModelRegularization:
    def __init__(self, model: nn.Module, patience: int = 5, min_delta: float = 0.001,
                 l1_factor: float = 0.0, l2_factor: float = 0.01):
        """
        Initialize regularization and early stopping functionality.
        
        Args:
            model: The neural network model
            patience: Number of epochs to wait for improvement before early stopping
            min_delta: Minimum change in monitored quantity to qualify as an improvement
            l1_factor: L1 regularization factor
            l2_factor: L2 regularization factor
        """
        self.model = model
        self.patience = patience
        self.min_delta = min_delta
        self.l1_factor = l1_factor
        self.l2_factor = l2_factor
        self.best_loss = None
        self.counter = 0
        self.early_stop = False
        self.history: Dict[str, List[float]] = {
            'train_loss': [], 'val_loss': [],
            'train_acc': [], 'val_acc': []
        }
        
    def should_stop(self, val_loss: float) -> bool:
        """Check if training should stop based on validation loss."""
        if self.best_loss is None:
            self.best_loss = val_loss
            return False
            
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
                return True
        return False
    
    def compute_regularization_loss(self) -> torch.Tensor:
        """Compute L1 and L2 regularization losses."""
        l1_loss = torch.tensor(0., device=next(self.model.parameters()).device)
        l2_loss = torch.tensor(0., device=next(self.model.parameters()).device)
        
        for param in self.model.parameters():
            if param.requires_grad:
                l1_loss += torch.sum(torch.abs(param))
                l2_loss += torch.sum(param.pow(2))
        
        return self.l1_factor * l1_loss + self.l2_factor * l2_loss
    
    def update_history(self, train_loss: float, val_loss: float, 
                      train_acc: float, val_acc: float) -> None:
        """Update training history."""
        self.history['train_loss'].append(train_loss)
        self.history['val_loss'].append(val_loss)
        self.history['train_acc'].append(train_acc)
        self.history['val_acc'].append(val_acc)
    
    def plot_training_history(self, save_path: Optional[str] = None):
        """Plot training history."""
        plt.figure(figsize=(12, 4))
        
        # Plot loss
        plt.subplot(1, 2, 1)
        plt.plot(self.history['train_loss'], label='Train Loss')
        plt.plot(self.history['val_loss'], label='Validation Loss')
        plt.title('Model Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        
        # Plot accuracy
        plt.subplot(1, 2, 2)
        plt.plot(self.history['train_acc'], label='Train Accuracy')
        plt.plot(self.history['val_acc'], label='Validation Accuracy')
        plt.title('Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy (%)')
        plt.legend()
        
        plt.tight_layout()
        if save_path:
            plt.savefig(save_path)
        plt.show()
    
    def check_overfitting(self, threshold: float = 0.1) -> Tuple[bool, float]:
        """
        Check if model is overfitting based on train/val performance gap.
        
        Args:
            threshold: Maximum acceptable difference between train and val accuracy
            
        Returns:
            Tuple of (is_overfitting, gap)
        """
        if len(self.history['train_acc']) < 2:
            return False, 0.0
            
        train_acc = self.history['train_acc'][-1]
        val_acc = self.history['val_acc'][-1]
        gap = train_acc - val_acc
        
        return gap > threshold, gap

    def get_learning_curves(self) -> Dict[str, List[float]]:
        """Get learning curves data."""
        return self.history

def apply_regularization(model: nn.Module) -> nn.Module:
    """
    Apply regularization techniques to model architecture.
    
    Args:
        model: Original model
        
    Returns:
        Modified model with regularization
    """
    # Add dropout layers if not present
    def add_dropout(module):
        for name, child in module.named_children():
            if isinstance(child, (nn.Linear, nn.Conv2d)):
                module._modules[name] = nn.Sequential(
                    child,
                    nn.Dropout(p=0.3)
                )
            else:
                add_dropout(child)
    
    # Add batch normalization if not present
    def add_batchnorm(module):
        for name, child in module.named_children():
            if isinstance(child, nn.Conv2d):
                module._modules[name] = nn.Sequential(
                    child,
                    nn.BatchNorm2d(child.out_channels)
                )
            elif isinstance(child, nn.Linear):
                module._modules[name] = nn.Sequential(
                    child,
                    nn.BatchNorm1d(child.out_features)
                )
            else:
                add_batchnorm(child)
    
    model_copy = type(model)()  # Create new instance of same model type
    model_copy.load_state_dict(model.state_dict())
    
    add_dropout(model_copy)
    add_batchnorm(model_copy)
    
    return model_copy


# pipeline.py

In [None]:
import torch
from preprocessing import CIFAR10PreProcessor, CSVGenerator
from trainer import CIFAR10Trainer
from config import TrainingConfig


class CIFAR10Pipeline:
    """Pipeline class for handling training workflow"""

    def __init__(self, config: TrainingConfig):
        self.config = config
        self.device = self.get_device()
        self.trainer = None

    @staticmethod
    def get_device():
        """Set up and return the appropriate device for training"""

        if torch.backends.mps.is_available():
            return torch.device('mps')
        elif torch.cuda.is_available():
            return torch.device('cuda')
        else:
            return torch.device('cpu')

    def preprocess_data(self, augment_data=True):
        """Handle data preprocessing and augmentation"""
        processor = CIFAR10PreProcessor(
            self.config.TRAIN_LABELS_PATH,
            self.config.TRAIN_IMAGES_DIR,
            self.config.TEST_IMAGES_DIR
        )

        # Analyze and display data properties
        processor.load_and_display_images(num_samples=5, random_state=42)
        sizes, brightness, pixel_ranges = processor.analyze_image_properties(
            sample_size=self.config.SAMPLE_SIZE
        )

        if augment_data:
            # Create augmented dataset
            processor.create_and_save_augmented_images(
                augmented_dir=str(self.config.AUGMENTED_TRAIN_DIR),
                augmentations_per_image=self.config.AUGMENTATIONS_PER_IMAGE
            )

            # Generate CSV for augmented data
            generator = CSVGenerator()
            generator.script_to_generate_csv(
                fileName="train_labels",
                base_dir=str(self.config.AUGMENTED_TRAIN_DIR)
            )

    def train_model(self, hypertune=False):
        """Train the model"""
        print(f"Using device: {self.device}")

        self.trainer = CIFAR10Trainer(
            train_csv=str(self.config.DATA_DIR / "train_labels.csv"),
            train_dir=str(self.config.AUGMENTED_TRAIN_DIR),
            batch_size=self.config.BATCH_SIZE,
            learning_rate=self.config.LEARNING_RATE,
            num_classes=self.config.NUM_CLASSES,
            device=self.device
        )

        # Add hyperparameter tuning
        self.trainer.tune_hyperparameters(train_csv=str(self.config.DATA_DIR / "train_labels.csv"),
                                          train_dir=str(self.config.AUGMENTED_TRAIN_DIR)) if hypertune else None

        self.trainer.train(
            epochs=self.config.EPOCHS,
            # save_dir=str(self.config.MODELS_DIR)
            save_dir=str("hyper-models")
        )

    def predict_image(self, image_path):
        """Make prediction for a single image"""
        if self.trainer is None:
            raise ValueError(
                "Model not trained. Please train the model first.")

        class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
                       'dog', 'frog', 'horse', 'ship', 'truck']
        prediction = self.trainer.predict(image_path)
        print(f'Predicted class: {class_names[prediction]}')
        return class_names[prediction]

    def load_trained_model(self, model_path=None):
        """Load a trained model"""
        if model_path is None:
            model_path = self.config.MODELS_DIR / 'best_model.pt'

        if self.trainer is None:
            self.trainer = CIFAR10Trainer(
                train_csv=str(self.config.DATA_DIR / "train_labels.csv"),
                train_dir=str(self.config.AUGMENTED_TRAIN_DIR),
                batch_size=self.config.BATCH_SIZE,
                learning_rate=self.config.LEARNING_RATE,
                num_classes=self.config.NUM_CLASSES,
                device=self.device
            )

        self.trainer.load_model(str(model_path))

    def predict_test_directory(self, output_csv="predictions.csv"):
        """Predict classes for all test images and save results"""
        if self.trainer is None:
            raise ValueError(
                "Model not trained. Please load or train the model first.")

        predictions_df = self.trainer.predict_test_directory(
            test_dir=str(self.config.TEST_IMAGES_DIR),
            output_csv=output_csv
        )
        return predictions_df


# trainer.py

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
import pandas as pd
from PIL import Image
import os
from tqdm import tqdm
from modelRegularization import ModelRegularization, apply_regularization
from dataset import CIFAR10Dataset
from model import CIFAR10Model
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

class CIFAR10Trainer:
    def __init__(self, train_csv, train_dir, batch_size=32, learning_rate=0.001,
                 num_classes=10, device=None):
        """
        Initialize the trainer with regularization.
        """
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.num_classes = num_classes

        # Set device
        if device is None:
            self.device = torch.device("mps" if torch.backends.mps.is_available()
                                       else "cuda" if torch.cuda.is_available()
                                       else "cpu")
        else:
            self.device = device
        print(f"Using device: {self.device}")

        # Create transforms with augmentation
        self.train_transform = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.RandomAffine(0, translate=(0.1, 0.1)),
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.RandomResizedCrop(32, scale=(0.8, 1.0)),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])

        self.val_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])

        # Setup datasets and dataloaders
        self.setup_data(train_csv, train_dir)

        # Initialize model with regularization
        self.model = apply_regularization(CIFAR10Model(
            num_classes=num_classes)).to(self.device)

        # Setup loss, optimizer and learning rate scheduler
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.AdamW(
            self.model.parameters(),
            lr=learning_rate,
            weight_decay=0.01  # L2 regularization
        )
        self.scheduler = optim.lr_scheduler.ReduceLROnPlateau(
            self.optimizer,
            mode='min',
            factor=0.2,
            patience=3,
            min_lr=1e-6
        )

        # Initialize regularization handler
        self.regularization = ModelRegularization(
            model=self.model,
            patience=5,
            min_delta=0.001,
            l1_factor=0.0001,
            l2_factor=0.01
        )

    def tune_hyperparameters(self, train_csv, train_dir):
        """Experiment with different hyperparameters"""
        self.train_csv = train_csv  # Store paths
        self.train_dir = train_dir
        configs = [
            {
                'optimizer': ('AdamW', {'lr': 0.001, 'weight_decay': 0.01}),
                'conv_filters': [32, 64, 128],
                'dropout_rate': 0.3,
                'batch_size': 32
            },
            {
                'optimizer': ('SGD', {'lr': 0.01, 'momentum': 0.9}),
                'conv_filters': [64, 128, 256],
                'dropout_rate': 0.5,
                'batch_size': 64
            },
            {
                'optimizer': ('Adam', {'lr': 0.0005}),
                'conv_filters': [16, 32, 64],
                'dropout_rate': 0.2,
                'batch_size': 128
            }
        ]

        results = []
        for config in tqdm(configs, desc="Testing configurations"):
            # Initialize model with config
            model = CIFAR10Model(
                num_classes=self.num_classes,
                conv_filters=config['conv_filters'],
                dropout_rate=config['dropout_rate']
            ).to(self.device)

            # Setup optimizer
            opt_name, opt_params = config['optimizer']
            optimizer_class = getattr(optim, opt_name)
            optimizer = optimizer_class(model.parameters(), **opt_params)

            # Train and evaluate
            self.model = model
            self.optimizer = optimizer
            self.batch_size = config['batch_size']
            self.setup_data(self.train_csv, self.train_dir)

            # Train for fewer epochs during tuning
            metrics = self.train(epochs=5)

            results.append({
                'config': config,
                'metrics': metrics
            })
            print("Hyperparameter tuning:")
            print(f"Config: {config}")

        return results

    def setup_data(self, train_csv, train_dir, val_split=0.2):
        """Setup train and validation datasets and dataloaders"""
        dataset = CIFAR10Dataset(train_csv, train_dir, self.train_transform)

        train_size = int((1 - val_split) * len(dataset))
        val_size = len(dataset) - train_size
        train_dataset, val_dataset = torch.utils.data.random_split(
            dataset, [train_size, val_size]
        )

        val_dataset.dataset.transform = self.val_transform

        self.train_loader = DataLoader(
            train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=4,
            pin_memory=True
        )
        self.val_loader = DataLoader(
            val_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=4,
            pin_memory=True
        )

    def train_epoch(self, epoch):
        """Train for one epoch with regularization"""
        self.model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        pbar = tqdm(self.train_loader, desc=f'Epoch {epoch}')
        for inputs, labels in pbar:
            inputs, labels = inputs.to(self.device), labels.to(self.device)

            self.optimizer.zero_grad()
            outputs = self.model(inputs)

            # Add regularization loss
            loss = self.criterion(outputs, labels)
            reg_loss = self.regularization.compute_regularization_loss()
            total_loss = loss + reg_loss

            total_loss.backward()
            self.optimizer.step()

            running_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

            pbar.set_postfix({
                'loss': running_loss/len(self.train_loader),
                'acc': 100.*correct/total
            })

        return running_loss/len(self.train_loader), 100.*correct/total

    def validate(self):
        """Validate the model"""
        self.model.eval()
        running_loss = 0.0
        correct = 0
        total = 0

        with torch.no_grad():
            for inputs, labels in self.val_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)

                running_loss += loss.item()
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()

        return running_loss/len(self.val_loader), 100.*correct/total

    def train(self, epochs=50, save_dir='models'):
        """Train the model with regularization and early stopping"""
        os.makedirs(save_dir, exist_ok=True)
        best_val_acc = 0

        for epoch in range(1, epochs + 1):
            train_loss, train_acc = self.train_epoch(epoch)
            val_loss, val_acc = self.validate()

            # Update regularization history
            self.regularization.update_history(
                train_loss, val_loss, train_acc, val_acc)

            # Check for overfitting
            is_overfitting, gap = self.regularization.check_overfitting()
            if is_overfitting:
                print(
                    f"Warning: Possible overfitting detected (gap: {gap:.2f}%)")

            # Learning rate scheduling
            self.scheduler.step(val_loss)

            print(f'Epoch {epoch}:')
            print(
                f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%')
            print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%')

            # Save best model
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                self.save_model(os.path.join(save_dir, 'best_model.pt'))
                print(f'Saved model with val_acc: {val_acc:.2f}%')

            # Early stopping check
            if self.regularization.should_stop(val_loss):
                print("Early stopping triggered!")
                break

            print('-' * 70)

        # Plot training history
        self.regularization.plot_training_history(
            save_path=os.path.join(save_dir, 'training_history.png')
        )

    def save_model(self, path):
        """Save the model"""
        torch.save({
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'scheduler_state_dict': self.scheduler.state_dict(),
        }, path)

    def load_model(self, path):
        """Load the model with weights_only=True"""
        checkpoint = torch.load(
            path, map_location=self.device, weights_only=True)
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])

    def predict(self, image_path):
        """Predict class for a single image"""
        self.model.eval()
        transform = self.val_transform

        image = Image.open(image_path).convert('RGB')
        image = transform(image).unsqueeze(0).to(self.device)

        with torch.no_grad():
            output = self.model(image)
            _, predicted = output.max(1)

        return predicted.item()

    def predict_test_directory(self, test_dir, output_csv="predictions.csv"):
        """Predict classes for all images in test directory and save to CSV."""
        idx_to_class = {
            0: 'airplane', 1: 'automobile', 2: 'bird', 3: 'cat', 4: 'deer',
            5: 'dog', 6: 'frog', 7: 'horse', 8: 'ship', 9: 'truck'
        }

        predictions = []
        image_files = sorted([f for f in os.listdir(test_dir)
                              if f.endswith(('.png', '.jpg', '.jpeg'))])

        for image_file in tqdm(image_files, desc="Predicting test images"):
            image_path = os.path.join(test_dir, image_file)
            try:
                class_idx = self.predict(str(image_path))
                predictions.append({
                    'image_name': image_file,
                    'predicted_class': idx_to_class[class_idx]
                })
            except Exception as e:
                print(f"Error predicting {image_file}: {str(e)}")

        predictions_df = pd.DataFrame(predictions)
        predictions_df.to_csv(output_csv, index=False)
        print(f"\nPredictions saved to {output_csv}")
        return predictions_df

    def compute_validation_metrics(self):
        """Compute all evaluation metrics using validation set"""

        all_preds = []
        all_labels = []
        self.model.eval()

        # Collect predictions
        with torch.no_grad():
            for inputs, labels in self.val_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                _, preds = outputs.max(1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # Calculate metrics
        accuracy = accuracy_score(all_labels, all_preds)
        precision, recall, f1, _ = precision_recall_fscore_support(
            all_labels, all_preds, average='weighted')

        print("\nModel Performance Metrics:")
        print(f"Overall Accuracy: {accuracy:.4f}")
        print(f"Weighted Precision: {precision:.4f}")
        print(f"Weighted Recall: {recall:.4f}")
        print(f"Weighted F1-Score: {f1:.4f}")

        # Detailed report and confusion matrix
        class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer',
                       'dog', 'frog', 'horse', 'ship', 'truck']
        report = classification_report(
            all_labels, all_preds, target_names=class_names)
        cm = confusion_matrix(all_labels, all_preds)

        print("\nDetailed Classification Report:")
        print(report)

        # Plot confusion matrix
        plt.figure(figsize=(10, 8))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                    xticklabels=class_names, yticklabels=class_names)
        plt.title('Validation Set Confusion Matrix')
        plt.ylabel('True Label')
        plt.xlabel('Predicted Label')
        plt.tight_layout()
        plt.savefig('validation_confusion_matrix.png')
        plt.close()

        # Plot training history
        self.regularization.plot_training_history('training_history.png')

        metrics = {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'detailed_report': report,
            'confusion_matrix': cm
        }

        return metrics


# test_setup.py

In [None]:
import torch
import torchvision
import sys

def get_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    elif torch.backends.mps.is_available():
        return torch.device('mps')
    else:
        return torch.device('cpu')

def test_pytorch_setup():
    print(f"Python version: {sys.version}")
    print(f"PyTorch version: {torch.__version__}")
    print(f"Torchvision version: {torchvision.__version__}")
    
    # Test device availability
    print("\nDevice Information:")
    print(f"CUDA available: {torch.cuda.is_available()}")
    print(f"MPS available: {torch.backends.mps.is_available()}")
    
    # Create a simple tensor and test device movement
    device = get_device()
    print(f"\nUsing device: {device}")
    
    x = torch.randn(2, 3)
    try:
        x = x.to(device)
        print("Successfully created and moved tensor to device")
        print(x)
    except Exception as e:
        print(f"Error when testing tensor operations: {e}")

if __name__ == "__main__":
    test_pytorch_setup()


# main.py

In [None]:
from pathlib import Path
from config import TrainingConfig
from pipeline import CIFAR10Pipeline


def main():
    # Load or create configuration
    config_path = Path("config.yaml")
    if config_path.exists():
        config = TrainingConfig.load_config(config_path)
    else:
        config = TrainingConfig()
        config.save_config(config_path)

    # Initialize pipeline
    pipeline = CIFAR10Pipeline(config)

    # Step 1: Preprocess and augment data
    # Comment out if preprocessing is already done
    # pipeline.preprocess_data()


    # Step 2: Train model
    pipeline.train_model(hypertune=True)

    # Step 3: Make predictions
    pipeline.load_trained_model("models/best_model.pt")
    # metrics_report, confusion_mat = pipeline.trainer.compute_validation_metrics()
    metrics = pipeline.trainer.compute_validation_metrics()
    print("Validation metrics: ", metrics, "\n")
    # predictions_df = pipeline.predict_test_directory("data/predictions.csv")


if __name__ == "__main__":
    main()
