#Install and imports

In [None]:
!pip install kaggle

In [None]:
import os
import glob
import time
import tqdm
import torch
import random
import logging
import zipfile
import numpy as np
import torchvision
import pandas as pd
from PIL import Image
import torch.nn as nn
from shutil import copyfile
import torch.optim as optim
from torch.optim import Adam
import torch.nn.functional as F
import matplotlib.pyplot as plt
from collections import defaultdict
from torch.utils.data import Dataset
from torch.nn import CrossEntropyLoss
from sklearn.model_selection import KFold
import torchvision.transforms as transforms
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import CyclicLR
from torch.utils.data import DataLoader, Subset
from torch.optim.lr_scheduler import ReduceLROnPlateau


#Loading the data

In [None]:
from google.colab import files
files.upload()

!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
!kaggle datasets download -d jutrera/stanford-car-dataset-by-classes-folder

with zipfile.ZipFile("stanford-car-dataset-by-classes-folder.zip", "r") as zip_ref:
    zip_ref.extractall("stanford_cars")

# Working on the meta data of an image, decomposing the name of the image, and connecting to the correct classes

In [None]:
train_car = glob.glob("/content/stanford_cars/car_data/car_data/train/*/*")
test_car = glob.glob("/content/stanford_cars/car_data/car_data/test/*/*")
def get_car_class(car):
    '''
    This function extracts the car label/class per given image.
    Additionally, it decomposes the class into manufacturer, type, year,
    and includes both image number and full image filename.
    '''

    # Extract the car class from the folder name
    car_class = car.split('/')[-2]  # This gets the folder name containing the class label

    # Extract image filename and image number
    image_file = car.split('/')[-1]  # Full image filename with extension
    image_number = image_file.split('.')[0]  # Extracts the file name without extension

    # Parse the class into details
    car_parts = car_class.split()
    manufacturer = car_parts[0]
    car_type = " ".join(car_parts[1:-1])
    year = car_parts[-1]


    return {
        "full_name": car_class,
        "manufacturer": manufacturer,
        "type": car_type,
        "year": year,
        "image_number": image_number,
        "image_name": image_file
    }
y_train = []
y_test = []

for i in range(len(train_car)):
  y_train.append(get_car_class(train_car[i]))

## converting each photo into a numpy array of RGB pixels
for i in range(len(test_car)):
    y_test.append(get_car_class(test_car[i]))

df_train = pd.DataFrame(y_train)

df_test = pd.DataFrame(y_test)

df_train.to_csv("train_data.csv", index=False)
df_test.to_csv("test_data.csv", index=False)

print("Train and Test data saved as CSV files.")
df_train_labels = pd.read_csv('/content/train_data.csv')
df_test_labels = pd.read_csv('/content/test_data.csv')

columns = ["image_name", "x_min", "y_min", "x_max", "y_max", "class_id"]
train_annotations = pd.read_csv('/content/stanford_cars/anno_train.csv', header=None, names=columns)
test_annotations = pd.read_csv('/content/stanford_cars/anno_test.csv', header=None, names=columns)

merged_data_train = pd.merge(df_train_labels, train_annotations, on='image_name', how='inner')
merged_data_test = pd.merge(df_test_labels, test_annotations, on='image_name', how='inner')

#Car dataset

In [None]:
"""
CarDataset

This class is designed to manage and load image data from the Stanford Cars Dataset efficiently,
allowing seamless integration with PyTorch for machine learning tasks. It provides tools for
loading, preprocessing, and interacting with the dataset, including mapping class names to IDs,
retrieving images, and performing image transformations.

Methods:
--------
__init__(annotations, root_dir, transform=None):
    Initializes the dataset with metadata, root directory, and optional transformations.
    Creates mappings between car types and their respective class IDs.

__len__():
    Returns the total number of images in the dataset.

__getitem__(idx):
    Retrieves the image and its metadata for a given index, applies transformations, and
    crops the image if bounding box data is available.

_generate_samples():
    Generates a list of tuples (image_path, class_id) for all images in the dataset.

generate_car_indax_image_dict():
    Creates a dictionary mapping each class ID to a list of image paths belonging to that class.

generate_car_image_dict():
    Creates a dictionary mapping each car type (full name) to a list of image paths belonging to it.

display_image_from_class(class_id):
    Displays an image from the given car class using the class ID. The first available image is shown.

print_number_of_images_in_class(class_id):
    Prints the number of images belonging to the specified car class based on its class ID.

crop_image_by_class_and_index(class_id, image_index):
    Crops a specific image from a given car class and index using its bounding box coordinates.

Properties:
-----------
classes:
    Returns a list of all car type names (class names) in the dataset.

class_to_idx:
    Returns a dictionary mapping car type names to their corresponding class IDs.

samples:
    Returns a list of tuples (image_path, class_id) for all images in the dataset.

targets:
    Returns a list of class IDs corresponding to each image in the dataset.

Usage:
------
This class is ideal for:
1. Loading image data and associated metadata from the Stanford Cars Dataset.
2. Preprocessing images, including cropping and applying transformations.
3. Managing dataset annotations for use in machine learning pipelines.
4. Integrating with PyTorch's DataLoader for efficient data batching and loading.

"""


class CarDataset(Dataset):
    def __init__(self, annotations, root_dir, transform=None):
        """
        annotations: The merged table containing all metadata about the images.
        root_dir: The base directory where the images are stored.
        transform: Transformations (if required).
        """
        self.annotations = annotations
        self.root_dir = root_dir
        self.transform = transform

        # Generate the car type-to-id mapping
        self.class_to_idx1 = dict(zip(self.annotations['full_name'], self.annotations['class_id']))
        self.idx_to_class = {idx: car_type for car_type, idx in self.class_to_idx1.items()}
        self.samples1 = self._generate_samples()
        self.targets1 = [item[1] for item in self.samples]  # Class ids

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        # Get image details from the metadata
        row = self.annotations.iloc[idx]
        image_name = row['image_name']
        class_id = row['class_id']
        manufacturer = row['manufacturer']
        car_type = row['type']
        year = row['year']
        full_name = row['full_name']
        bbox = (row['x_min'], row['y_min'], row['x_max'], row['y_max'])

        # Build the full image path
        img_path = os.path.join(self.root_dir, row['full_name'], row['image_name'])
        # Open the image
        image = Image.open(img_path)
        if image.mode != 'RGB':
            image = image.convert('RGB')
        # Crop the image based on the bounding box (if provided)
        if bbox:
            image = image.crop(bbox)

        # Apply transformations (if any)
        if self.transform:
            image = self.transform(image)
        return image, class_id

    def _generate_samples(self):
        """
        Helper function to generate a list of (image_path, class_id) tuples.
        """
        samples = []
        for _, row in self.annotations.iterrows():
            img_name = row['image_name']
            img_path = os.path.join(self.root_dir, row['full_name'], img_name)
            class_id = row['class_id']
            samples.append((img_path, class_id))
        return samples


    def generate_car_indax_image_dict(self):
        car_image_dict = {}

        # Iterate through all annotations and group images by car type
        for _, row in self.annotations.iterrows():
            class_id = row['class_id']
            car_type = row['full_name']  # Using the car type from the merged metadata

            img_name = row['image_name']
            img_path = os.path.join(self.root_dir, row['full_name'], img_name)

            if class_id not in car_image_dict:
                car_image_dict[class_id] = []
            car_image_dict[class_id].append(img_path)
        return car_image_dict



    def generate_car_image_dict(self):
        """
        Generate a dictionary of car types and the images that belong to them.
        The dictionary will be of the form:
        {car_type_name: [image_path1, image_path2, ...]}
        """
        car_image_dict = {}

        # Iterate through all annotations and group images by car type
        for _, row in self.annotations.iterrows():
            class_id = row['class_id']
            car_type = row['full_name']  # Using the car type from the merged metadata

            img_name = row['image_name']
            img_path = os.path.join(self.root_dir, row['full_name'], img_name)

            if car_type not in car_image_dict:
                car_image_dict[car_type] = []
            car_image_dict[car_type].append(img_path)
        return car_image_dict

    def display_image_from_class(self, class_id):
        """
        Given a class_id, this function displays one image from that car class.
        """
        # Get car type from class_id
        car_type = self.idx_to_class.get(class_id, None)

        if not car_type:
            print(f"Unknown class_id: {class_id}")
            return

        # Retrieve all images for the given car class
        car_image_paths = self.generate_car_image_dict().get(car_type, [])

        if not car_image_paths:
            print(f"No images found for class {car_type}")
            return

        # Select the first image from the list
        img_path = car_image_paths[0]

        try:
            # Open and display the image
            image = Image.open(img_path)
            plt.imshow(image)
            plt.axis('off')  # Hide axes
            plt.title(f"Class: {car_type}", fontsize=10)
            plt.show()
        except Exception as e:
            print(f"Error opening image: {img_path}, Error: {e}")

    def print_number_of_images_in_class(self, class_id):
        """
        Given a class_id, this function prints the number of images in that car class.
        """
        # Get car type from class_id
        car_type = self.idx_to_class.get(class_id, None)

        if not car_type:
            print(f"Unknown class_id: {class_id}")
            return
        # Retrieve all images for the given car class
        car_image_paths = self.generate_car_image_dict().get(car_type, [])
        # Print the number of images for the car type
        print(f"Number of images for class {car_type}: {len(car_image_paths)}")

    def crop_image_by_class_and_index(self, class_id, image_index):
        """
        Crop an image based on class_id and image_index.
        The image will be cropped using the bounding box coordinates (x_min, y_min, x_max, y_max).
        """
        # Filter annotations for the given class_id
        class_annotations = self.annotations[self.annotations['class_id'] == class_id]

        # Ensure that the image_index is within the range
        if image_index >= len(class_annotations):
            print(f"Invalid image index: {image_index} for class_id {class_id}")
            return None

        # Get the annotation for the given image index
        row = class_annotations.iloc[image_index]
        image_name = row["image_name"]
        bbox_xmin = row["x_min"]
        bbox_ymin = row["y_min"]
        bbox_xmax = row["x_max"]
        bbox_ymax = row["y_max"]

        # Find the image path and car model name
        image_path = os.path.join(self.root_dir, row['full_name'], image_name)
        if os.path.exists(image_path):
            try:
                # Open and crop the image
                image = Image.open(image_path)
                cropped_image = image.crop((bbox_xmin, bbox_ymin, bbox_xmax, bbox_ymax))
                return cropped_image
            except Exception as e:
                print(f"Error processing {image_name}: {e}")
                return None
        else:
            print(f"Image not found: {image_name}")
            return None

    # Methods added for functionality similar to ImageFolder:
    @property
    def classes(self):
        """Returns the list of car types (class names)."""
        return list(self.idx_to_class.values())

    @property
    def class_to_idx(self):
        """Returns a dictionary of class names to class indices."""
        return self.class_to_idx1

    @property
    def samples(self):
        """Returns a list of (image_path, class_id) tuples."""
        return self.samples1

    @property
    def targets(self):
        """Returns a list of class_ids (targets) for each image."""
        return self.targets1

##Tests for the cardataset

In [None]:
transform = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images to 128x128 for consistency
    transforms.ToTensor(),          # Convert images to tensor
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.229, 0.224, 0.225])  # Normalize with ImageNet mean and std
])
train_data = CarDataset(merged_data_train, '/content/stanford_cars/car_data/car_data/train', transform=transform)
type(train_data.__getitem__(1))
tensor,labl=train_data.__getitem__(1)

# Reverse normalization (assuming the normalization was done with mean=[0,0,0] and std=[1,1,1])
tensor = tensor * 0.5 + 0.5  # Scale back to [0, 1] from [-1, 1]

# Convert tensor to numpy array
image = tensor.permute(1, 2, 0).numpy()  # Change shape to [H, W, C] for displaying
print(labl)
# Plot the image
plt.imshow(image)
plt.axis('off')  # Hide axis for better visualization
plt.show()
train_data.class_to_idx['Acura Integra Type R 2001']
train_data.classes
train_data.samples
train_data.targets
train_data.__len__()
train_data.crop_image_by_class_and_index(1,2)
train_data.display_image_from_class(6)
train_data.print_number_of_images_in_class(2)
train_data.generate_car_image_dict()
train_data._generate_samples()
train_data.display_image_from_class(66)

#DATALODAR

In [None]:
"""
CarDataLoader

A custom DataLoader designed to work with the CarDataset or any compatible PyTorch Dataset.
This class provides batch loading, shuffling, and other advanced options for dataset management,
making it suitable for machine learning tasks.

Initialization Parameters:
--------------------------
dataset: Dataset
    The dataset from which to load the data. Should be compatible with the CarDataset class.
batch_size: int, optional (default=32)
    The number of samples per batch to load.
shuffle: bool, optional (default=True)
    Whether to shuffle the data at every epoch.
num_workers: int, optional (default=0)
    The number of subprocesses to use for data loading. If 0, data will be loaded in the main process.
collate_fn: callable, optional (default=None)
    A function to merge a list of samples into a batch. If None, a default function is used.
sampler: iterable, optional (default=None)
    A custom sampler that specifies the order in which data is loaded. If None, the default sampler is used.
drop_last: bool, optional (default=False)
    If True, drops the last incomplete batch.
pin_memory: bool, optional (default=False)
    If True, copies Tensors into CUDA pinned memory before returning them.
timeout: float, optional (default=0)
    The timeout value for collecting a batch.
worker_init_fn: callable, optional (default=None)
    A function to initialize each worker process.
prefetch_factor: int, optional (default=None)
    Number of samples loaded in advance by each worker.
persistent_workers: bool, optional (default=False)
    Whether to keep worker processes alive after the iterator is exhausted.
pin_memory_device: str, optional (default='')
    Device to pin memory for data transfer (e.g., "cuda").

Methods:
--------
__len__():
    Returns the number of batches in the dataset. Handles rounding for incomplete batches based on drop_last.

__iter__():
    Returns an iterator over batches of data. If shuffling is enabled, data indices are shuffled at the start.

default_collate_fn(batch_data):
    The default collate function that stacks images and labels into batches.
    Converts images into a stacked tensor and labels into a tensor. Handles pinning memory for CUDA if enabled.

Attributes:
-----------
dataset:
    The dataset object passed during initialization.
batch_size:
    Number of samples per batch.
shuffle:
    Indicates if the data should be shuffled.
num_workers:
    Number of subprocesses used for data loading.
collate_fn:
    Function used for batching data.
sampler:
    Custom sampler for determining the order of data loading.
indices:
    The list of data indices used for batching, which can be shuffled.
label_to_idx:
    Dictionary mapping car type names (or other dataset labels) to class indices.
drop_last:
    Whether to drop the last incomplete batch.
pin_memory:
    Whether to enable pinned memory for CUDA operations.
pin_memory_device:
    The device used for pinning memory if applicable.

Usage:
------
This DataLoader can be used in place of PyTorch's `DataLoader` for custom batching and preprocessing.
Example:
    >>> dataloader = CarDataLoader(car_dataset, batch_size=64, shuffle=True)
    >>> for images, labels in dataloader:
    >>>     # Use images and labels in training
"""
class CarDataLoader:
    def __init__(self, dataset, batch_size=32, shuffle=True, num_workers=0, collate_fn=None, sampler=None, drop_last=False, pin_memory=False, timeout=0, worker_init_fn=None, prefetch_factor=None, persistent_workers=False, pin_memory_device=''):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.num_workers = num_workers
        self.collate_fn = collate_fn if collate_fn else self.default_collate_fn
        self.sampler = sampler
        self.drop_last = drop_last
        self.pin_memory = pin_memory
        self.timeout = timeout
        self.worker_init_fn = worker_init_fn
        self.prefetch_factor = prefetch_factor
        self.persistent_workers = persistent_workers
        self.pin_memory_device = pin_memory_device
        self.label_to_idx = dataset.class_to_idx if isinstance(dataset, CarDataset) else dataset.dataset.class_to_idx

        # If no sampler is provided, use a simple sequential sampler
        if self.sampler is None:
            self.indices = list(range(len(dataset)))
            if self.shuffle:
                random.shuffle(self.indices)
        else:
            self.indices = list(self.sampler)

    def __len__(self):
        # Calculate number of batches
        if self.drop_last:
            return len(self.indices) // self.batch_size
        else:
            return (len(self.indices) + self.batch_size - 1) // self.batch_size  # Integer division with rounding up

    def __iter__(self):
        # Shuffle the indices only if no sampler is used
        if self.sampler is None and self.shuffle:
            random.shuffle(self.indices)

        # Generate batches using the indices from the sampler or the list
        for i in range(0, len(self.indices), self.batch_size):
            batch_indices = self.indices[i:i + self.batch_size]

            # Handle the case where the last batch may be smaller if drop_last is False
            if self.drop_last and len(batch_indices) < self.batch_size:
                continue

            batch_data = [self.dataset[idx] for idx in batch_indices]

            # Apply the collate function to batch the data
            yield self.collate_fn(batch_data)


    def collate_fn(self, batch_data):
        images, labels = zip(*batch_data)
        images = torch.stack(images)  # Stack images into a batch
        labels = torch.tensor(labels)  # Convert labels to a tensor
        return images, labels


    def default_collate_fn(self, batch_data):
        """Default collate function that stacks images and labels into batches."""
        images, labels = zip(*batch_data)

        images = torch.stack(images)  # Stack images into a batch
        labels = torch.tensor(labels)  # Convert labels to a tensor

        if self.pin_memory:
          if torch.cuda.is_available():
            images = images.pin_memory()

        return images, labels

##test datalodar

In [None]:
train_data_ledor = CarDataLoader(train_data, batch_size=32, shuffle=True, num_workers=4, drop_last=True, pin_memory=True)
print(f"Number of batches: {len(train_data_ledor)}")

for batch_idx, (images, labels) in enumerate(train_data_ledor):
    print(f"Batch {batch_idx + 1}:")
    print(f" - Images shape: {images.shape}")
    print(f" - Labels shape: {labels.shape}")
    if batch_idx >= 1:
        break

batch_data = [train_data[idx] for idx in range(5)]
images, labels = train_data_ledor.collate_fn(batch_data)

print(f"Images shape from collate_fn: {images.shape}")
print(f"Labels from collate_fn: {labels.shape}")
train_data_ledor.__len__()

#CNN Nerworks

In [None]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


def resnet34(num_classes=1000):
    return ResNet(BasicBlock, [3, 4, 6, 3], num_classes=num_classes)

In [None]:

class CifarCNN(nn.Module):
    """CNN for the CIFAR Dataset with 196 classes"""

    def __init__(self, num_classes=196):
        """CNN Builder."""
        super(CifarCNN, self).__init__()

        self.conv_layer = nn.Sequential(
            # Conv Layer block 1
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),  # LeakyReLU במקום ReLU
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),
            nn.MaxPool2d(kernel_size=2, stride=2),  # input_resolution / 2 = 224 / 2 = 112

            # Conv Layer block 2
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),
            nn.MaxPool2d(kernel_size=2, stride=2),  # input_resolution / 4 = 112 / 2 = 56
            nn.Dropout2d(p=0.2),

            # Conv Layer block 3
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),
            nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),
            nn.MaxPool2d(kernel_size=2, stride=2),  # input_resolution / 8 = 56 / 2 = 28
        )

        # Update the input size of the fully connected layer
        self.fc_layer = nn.Sequential(
            nn.Dropout(p=0.4),
            nn.Linear(256 * 16 * 16, 1024),  # Update based on output dimensions after conv layers
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),
            nn.Linear(1024, 512),
            nn.BatchNorm1d(512),
            nn.LeakyReLU(inplace=True, negative_slope=0.01),
            nn.Dropout(p=0.5),
            nn.Linear(512, num_classes)  # Final output layer for 196 classes
        )

    def forward(self, x):
        """Perform forward pass."""
        # conv layers
        x = self.conv_layer(x)

        # flatten
        x = x.view(x.size(0), -1)  # Flatten the tensor

        # fully connected layers
        x = self.fc_layer(x)

        return x

In [None]:
class CarCNN(nn.Module):
    def __init__(self, num_classes):
        super(CarCNN, self).__init__()

        self.conv_layer = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(0.01),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.01),
            nn.MaxPool2d(2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.01),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(0.01),
            nn.MaxPool2d(2),
            nn.Dropout2d(0.2),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.01),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU(0.01),
            nn.MaxPool2d(2),
        )

        # Compute the size after the convolutional layers
        self._initialize_fc_layer()

    def _initialize_fc_layer(self):
        # Pass a dummy tensor through the conv layers to calculate the output size
        dummy_input = torch.zeros(1, 3, 128, 128)  # Example input: batch_size=1, 3 channels, 128x128 images
        output = self.conv_layer(dummy_input)
        flattened_size = output.view(1, -1).size(1)  # Flatten and get the size
        # Fully connected layers
        self.fc_layer = nn.Sequential(
            nn.Dropout(0.4),
            nn.Linear(512 * 16 * 16, 2048),
            nn.BatchNorm1d(2048),
            nn.LeakyReLU(0.01),
            nn.Linear(2048, 1024),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(0.01),
            nn.Dropout(0.5),
            nn.Linear(1024, num_classes),
        )

    def forward(self, x):
        x = self.conv_layer(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor

        x = self.fc_layer(x)
        return x


#Preparing to run the model

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
torch.cuda.empty_cache()

##smaller data

In [None]:
imited_annotations_10_train = merged_data_train.groupby('class_id').apply(lambda x: x.sample(min(len(x), 10))).reset_index(drop=True)
excluded_class_id = 196
filtered_annotations_train = imited_annotations_10_train[imited_annotations_10_train['class_id'] != excluded_class_id]
imited_annotations_10_test = merged_data_test.groupby('class_id').apply(lambda x: x.sample(min(len(x), 10))).reset_index(drop=True)
filtered_annotations_test = imited_annotations_10_test[imited_annotations_10_test['class_id'] != excluded_class_id]
filtered_annotations_test
results_df = pd.DataFrame(columns=["Model", "Test Accuracy", "Test Loss", "Training Time (s)"])

#Cutting in to 195 different categories

In [None]:
excluded_class_id = 196
train_whit195 = merged_data_train[merged_data_train['class_id'] != excluded_class_id]
test_whit195 = merged_data_test[merged_data_test['class_id'] != excluded_class_id]
test_whit195

#First run

In [None]:
def log_final_results(model_name, test_accuracy, test_loss, training_time, results_df):
    """
    Adds the final results of a model run into the global DataFrame.
    """
    new_row = {
        "Model": model_name,
        "Test Accuracy": test_accuracy,
        "Test Loss": test_loss,
        "Training Time (s)": training_time
    }

    new_df = pd.DataFrame([new_row])
    results_df = pd.concat([results_df, new_df], ignore_index=True)

    # Check if the file already exists
    try:
        # Try to read the existing CSV file
        existing_df = pd.read_csv("all_model_results.csv")
        results_df = pd.concat([existing_df, results_df], ignore_index=True)
    except FileNotFoundError:
        # If the file doesn't exist, don't do anything
        pass

    # Save the updated DataFrame to the CSV file
    results_df.to_csv("all_model_results.csv", index=False)



def train_model(model, train_dl, test_dl, loss_fn, optimizer, scheduler, n_epochs=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    losses = []
    accuracies = []
    test_losses = []
    test_accuracies = []

    model.train()  # Set the model to train mode initially
    best_test_acc = 0
    for epoch in tqdm.tqdm(range(1, n_epochs+1)):
        running_loss = 0.0
        running_correct = 0.0

        for i, data in enumerate(train_dl, 0):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            labels = labels - 1
            optimizer.zero_grad()
            outputs = model(inputs)
            predicted = torch.argmax(outputs, dim=-1)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            running_correct += (labels == predicted).sum().item()

        epoch_loss = running_loss / len(train_dl.dataset)
        epoch_acc = running_correct / len(train_dl.dataset) * 100.0

        logging.info(f"Epoch {epoch}/{n_epochs}, Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}%")

        losses.append(epoch_loss)
        accuracies.append(epoch_acc)

        model.eval()  # Switch to eval mode to evaluate on test data
        test_loss, test_acc = eval_model(model, test_dl, loss_fn)
        test_losses.append(test_loss)
        test_accuracies.append(test_acc)

        if test_acc > best_test_acc:
            best_test_acc = test_acc
            logging.info('Model saved.')
            torch.save(model.state_dict(), 'best_model.pt')

        model.train()  # Switch back to train mode after validation
        scheduler.step(test_acc)  # Step the scheduler

    print('Finished Training')
    return model, losses, accuracies, test_losses, test_accuracies


def eval_model(model, test_dl, loss_fn):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    correct = 0.0
    total = 0.0
    loss_total = 0.0
    with torch.no_grad():
        for images, labels in test_dl:
            images, labels = images.to(device), labels.to(device)
            labels = labels - 1
            outputs = model(images)
            loss = loss_fn(outputs, labels)
            loss_total += loss.item()
            predicted = torch.argmax(outputs, dim=-1)

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    test_acc = 100.0 * correct / total
    test_loss = loss_total / len(test_dl.dataset)

    logging.info(f'Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}%')

    return test_loss, test_acc

def plot_metrics(metrics, title, xlabel, ylabel, legend_labels, save_path=None):
    """
    Helper function to plot metrics.
    """
    for metric, label in zip(metrics, legend_labels):
        plt.plot(metric, label=label)
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.legend()
    if save_path:
        plt.savefig(save_path)
    plt.show()



def evaluate_model_on_test(model, test_data, criterion, device, batch_size=32):
    """
    Evaluate the model on the test dataset and plot the results.

    Args:
        model (torch.nn.Module): Trained model to evaluate.
        test_data (Dataset): Dataset for testing.
        criterion (Loss): Loss function to compute test loss.
        device (torch.device): Device to run computations (CPU or GPU).
        batch_size (int): Batch size for the CarDataLoader.

    Returns:
        float, float: Final test loss and accuracy.
    """
    # Initialize variables
    test_loss = 0.0
    correct = 0
    total = 0
    test_losses = []
    accuracies = []

    # Create DataLoader
    test_loader = CarDataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=4)

    # Set model to evaluation mode
    model.eval()

    # Disable gradient calculation
    with torch.no_grad():
        for data in test_loader:
            images, labels = data

            # Move to device
            images, labels = images.to(device), labels.to(device)

            # Adjust labels if necessary
            labels = labels - 1

            # Forward pass
            outputs = model(images)

            # Compute loss
            loss = criterion(outputs, labels)
            test_loss += loss.item()

            # Get predictions
            _, predicted = torch.max(outputs, 1)

            # Count correct predictions
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    # Calculate metrics
    avg_test_loss = test_loss / total
    accuracy = correct / total
    test_losses.append(avg_test_loss)
    accuracies.append(accuracy)

    # Print results
    print(f"Test Loss: {avg_test_loss:.4f}")
    print(f"Test Accuracy: {accuracy * 100:.2f}%")

    # Plotting results
    plt.figure(figsize=(8, 5))
    plt.plot(range(1, len(test_losses) + 1), test_losses, marker='o', label='Test Loss')
    plt.plot(range(1, len(accuracies) + 1), [acc * 100 for acc in accuracies], marker='s', label='Test Accuracy')
    plt.title("Test Loss and Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Value")
    plt.legend()
    plt.grid()
    plt.show()

    return avg_test_loss, accuracy



In [None]:
def main():
    # Model, optimizer, and loss function setup
    global results_df
    model_name = "CifarCNN_195"
    transform=transforms.Compose([
    transforms.Resize((128, 128)),  # Resize to 224x224 for uniformity
    transforms.ToTensor(),  # Convert to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
])
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    dataset = CarDataset(train_whit195, '/content/stanford_cars/car_data/car_data/train', transform=transform)
    num_classes = 195
    results_table = []
    model = CifarCNN(num_classes)
    optimizer = Adam(model.parameters(), lr=0.001)
    loss_fn = nn.CrossEntropyLoss()

    # K-Fold Cross Validation setup
    k_folds = 5
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    start_time = time.time()
    # Device setup (GPU or CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # Initialize results
    results = {}
    num_epochs = 10
    all_train_losses = []
    all_val_losses = []
    all_val_accuracies = []

    # K-fold loop
    for fold, (train_ids, val_ids) in enumerate(kfold.split(dataset)):
        print(f'Fold {fold+1}/{k_folds}')

        # Subset the dataset for training and validation
        train_subsampler = Subset(dataset, train_ids)
        val_subsampler = Subset(dataset, val_ids)

        # Create DataLoaders
        train_loader = CarDataLoader(train_subsampler, batch_size=32, shuffle=True, num_workers=4)
        val_loader = CarDataLoader(val_subsampler, batch_size=32, shuffle=False, num_workers=4)

        # Reset model, optimizer, and loss function for each fold
        model = CifarCNN(num_classes=num_classes).to(device)
        optimizer = Adam(model.parameters(), lr=0.001)
        loss_fn = CrossEntropyLoss()

        # Learning rate scheduler
        scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

        # Train and evaluate the model
        model, train_losses, train_accuracies, val_losses, val_accuracies = train_model(
            model, train_loader, val_loader, loss_fn, optimizer, scheduler, n_epochs=num_epochs
        )
        results_table.append({
            "Model Name": model_name,
            "Fold": fold + 1,
            "Train Loss": train_losses[-1],
            "Validation Loss": val_losses[-1],
            "Validation Accuracy": val_accuracies[-1],
            "Test Loss": None,
            "Test Accuracy": None
        })

        # Store results
        results[fold] = {'loss': val_losses[-1], 'accuracy': val_accuracies[-1]}
        all_train_losses.append(train_losses)
        all_val_losses.append(val_losses)
        all_val_accuracies.append(val_accuracies)
    # Calculate average performance across all folds
    avg_loss = sum([results[fold]['loss'] for fold in results]) / k_folds
    avg_accuracy = sum([results[fold]['accuracy'] for fold in results]) / k_folds

    print(f'\nAverage Loss: {avg_loss:.4f}, Average Accuracy: {avg_accuracy:.4f}')
    for fold in range(k_folds):
        plot_metrics(
            [all_train_losses[fold], all_val_losses[fold]],
            title=f'Fold {fold+1} Loss',
            xlabel='Epochs',
            ylabel='Loss',
            legend_labels=['Train Loss', 'Validation Loss']
        )

    # 2. Validation accuracy per fold
    for fold in range(k_folds):
        plot_metrics(
            [all_val_accuracies[fold]],
            title=f'Fold {fold+1} Validation Accuracy',
            xlabel='Epochs',
            ylabel='Accuracy (%)',
            legend_labels=['Validation Accuracy']
        )
    end_time = time.time()
    total_training_time = end_time - start_time
    test_dataset = CarDataset(test_whit195, '/content/stanford_cars/car_data/car_data/test', transform=transform)
    test_loss, test_accuracy = evaluate_model_on_test(model, test_dataset, loss_fn, device)
    log_final_results(model_name, test_accuracy, test_loss, total_training_time, results_df)
    results_df = pd.DataFrame(results_table)
    model_save_path = f"{model_name}_final_model.pth"
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved at {model_save_path}")
    results_df = pd.DataFrame(results_table)
    print(f"\nFinal Test Loss: {test_loss:.4f}")
    print(f"Final Test Accuracy: {test_accuracy * 100:.2f}%")
if __name__ == '__main__':
    main()

In [None]:
if os.path.exists("results_checkpoint.csv"):
    results_df = pd.read_csv("results_checkpoint.csv")
else:
    results_df = pd.DataFrame(columns=["Model Name", "Fold", "Train Loss", "Validation Loss", "Validation Accuracy", "Test Loss", "Test Accuracy"])


In [None]:
torch.cuda.empty_cache()

c

#Learning Rate Scheduler

In [None]:
def main():
    # Model, optimizer, and loss function setup
    global results_df
    model_name = "CifarCNN_improv1_195"
    transform=transforms.Compose([
    transforms.Resize((128, 128)),  # Resize to 224x224 for uniformity
    transforms.ToTensor(),  # Convert to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
])
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    dataset = CarDataset(train_whit195, '/content/stanford_cars/car_data/car_data/train', transform=transform)
    num_classes = 195
    results_table = []
    model = CifarCNN(num_classes).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)

    loss_fn = nn.CrossEntropyLoss()
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)
    # K-Fold Cross Validation setup
    k_folds = 5
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    start_time = time.time()
    # Device setup (GPU or CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # Initialize results
    results = {}
    num_epochs = 10
    all_train_losses = []
    all_val_losses = []
    all_val_accuracies = []

    # K-fold loop
    for fold, (train_ids, val_ids) in enumerate(kfold.split(dataset)):
        print(f'Fold {fold+1}/{k_folds}')

        # Subset the dataset for training and validation
        train_subsampler = Subset(dataset, train_ids)
        val_subsampler = Subset(dataset, val_ids)

        # Create DataLoaders
        train_loader = CarDataLoader(train_subsampler, batch_size=32, shuffle=True, num_workers=2)
        val_loader = CarDataLoader(val_subsampler, batch_size=32, shuffle=False, num_workers=2)

        # Reset model, optimizer, and loss function for each fold
        model = CifarCNN(num_classes=num_classes).to(device)
        optimizer = Adam(model.parameters(), lr=0.001)
        loss_fn = CrossEntropyLoss()

        # Learning rate scheduler
        # scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

        # Train and evaluate the model
        model, train_losses, train_accuracies, val_losses, val_accuracies = train_model(
            model, train_loader, val_loader, loss_fn, optimizer, scheduler, n_epochs=num_epochs
        )
        results_table.append({
            "Model Name": model_name,
            "Fold": fold + 1,
            "Train Loss": train_losses[-1],
            "Validation Loss": val_losses[-1],
            "Validation Accuracy": val_accuracies[-1],
            "Test Loss": None,
            "Test Accuracy": None
        })

        # Store results
        results[fold] = {'loss': val_losses[-1], 'accuracy': val_accuracies[-1]}
        all_train_losses.append(train_losses)
        all_val_losses.append(val_losses)
        all_val_accuracies.append(val_accuracies)
        scheduler.step(val_losses[-1])
        print(f"Fold {fold + 1}/{k_folds} - Train Loss: {train_losses[-1]}, Val Loss: {val_losses[-1]}")
        pd.DataFrame(results_table).to_csv("partial_results.csv", index=False)

    # Calculate average performance across all folds
    avg_loss = sum([results[fold]['loss'] for fold in results]) / k_folds
    avg_accuracy = sum([results[fold]['accuracy'] for fold in results]) / k_folds

    print(f'\nAverage Loss: {avg_loss:.4f}, Average Accuracy: {avg_accuracy:.4f}')
    for fold in range(k_folds):
        plot_metrics(
            [all_train_losses[fold], all_val_losses[fold]],
            title=f'Fold {fold+1} Loss',
            xlabel='Epochs',
            ylabel='Loss',
            legend_labels=['Train Loss', 'Validation Loss']
        )

    # 2. Validation accuracy per fold
    for fold in range(k_folds):
        plot_metrics(
            [all_val_accuracies[fold]],
            title=f'Fold {fold+1} Validation Accuracy',
            xlabel='Epochs',
            ylabel='Accuracy (%)',
            legend_labels=['Validation Accuracy']
        )
    end_time = time.time()
    total_training_time = end_time - start_time
    test_dataset = CarDataset(test_whit195, '/content/stanford_cars/car_data/car_data/test', transform=transform)
    test_loss, test_accuracy = evaluate_model_on_test(model, test_dataset, loss_fn, device)
    log_final_results(model_name, test_accuracy, test_loss, total_training_time, results_df)
    model_save_path = f"{model_name}_final_model.pth"
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved at {model_save_path}")
    results_df = pd.DataFrame(results_table)
    print(f"\nFinal Test Loss: {test_loss:.4f}")
    print(f"Final Test Accuracy: {test_accuracy * 100:.2f}%")
if __name__ == '__main__':
    main()



In [None]:
torch.cuda.empty_cache()

#Gradient Clipping

In [None]:
def train_model(model, train_dl, test_dl, loss_fn, optimizer, scheduler, n_epochs=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    losses = []
    accuracies = []
    test_losses = []
    test_accuracies = []

    model.train()  # Set the model to train mode initially
    best_test_acc = 0
    for epoch in tqdm.tqdm(range(1, n_epochs+1)):
        running_loss = 0.0
        running_correct = 0.0

        for i, data in enumerate(train_dl, 0):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            labels = labels - 1  # Adjust labels if needed

            optimizer.zero_grad()
            outputs = model(inputs)
            predicted = torch.argmax(outputs, dim=-1)
            loss = loss_fn(outputs, labels)
            loss.backward()

            # Apply gradient clipping to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            running_loss += loss.item()
            running_correct += (labels == predicted).sum().item()

        epoch_loss = running_loss / len(train_dl.dataset)
        epoch_acc = running_correct / len(train_dl.dataset) * 100.0

        logging.info(f"Epoch {epoch}/{n_epochs}, Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}%")

        losses.append(epoch_loss)
        accuracies.append(epoch_acc)

        model.eval()  # Switch to eval mode to evaluate on test data
        test_loss, test_acc = eval_model(model, test_dl, loss_fn)
        test_losses.append(test_loss)
        test_accuracies.append(test_acc)

        if test_acc > best_test_acc:
            best_test_acc = test_acc
            logging.info('Model saved.')
            torch.save(model.state_dict(), 'best_model.pt')

        model.train()  # Switch back to train mode after validation
        scheduler.step(test_acc)  # Step the scheduler

    print('Finished Training')
    return model, losses, accuracies, test_losses, test_accuracies




def main():
    # Model, optimizer, and loss function setup
    global results_df
    model_name = "CifarCNN_improv2_195"
    transform=transforms.Compose([
    transforms.Resize((128, 128)),  # Resize to 224x224 for uniformity
    transforms.ToTensor(),  # Convert to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
])
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    dataset = CarDataset(train_whit195, '/content/stanford_cars/car_data/car_data/train', transform=transform)
    num_classes = 195
    results_table = []
    model = CifarCNN(num_classes).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)

    loss_fn = nn.CrossEntropyLoss()
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)
    # K-Fold Cross Validation setup
    k_folds = 5
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    start_time = time.time()
    # Device setup (GPU or CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # Initialize results
    results = {}
    num_epochs = 10
    all_train_losses = []
    all_val_losses = []
    all_val_accuracies = []

    # K-fold loop
    for fold, (train_ids, val_ids) in enumerate(kfold.split(dataset)):
        print(f'Fold {fold+1}/{k_folds}')

        # Subset the dataset for training and validation
        train_subsampler = Subset(dataset, train_ids)
        val_subsampler = Subset(dataset, val_ids)

        # Create DataLoaders
        train_loader = CarDataLoader(train_subsampler, batch_size=32, shuffle=True, num_workers=4)
        val_loader = CarDataLoader(val_subsampler, batch_size=32, shuffle=False, num_workers=4)

        # Reset model, optimizer, and loss function for each fold
        model = CifarCNN(num_classes=num_classes).to(device)
        optimizer = Adam(model.parameters(), lr=0.001)
        loss_fn = CrossEntropyLoss()

        # Learning rate scheduler
        # scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

        # Train and evaluate the model
        model, train_losses, train_accuracies, val_losses, val_accuracies = train_model(
            model, train_loader, val_loader, loss_fn, optimizer, scheduler, n_epochs=num_epochs
        )
        results_table.append({
            "Model Name": model_name,
            "Fold": fold + 1,
            "Train Loss": train_losses[-1],
            "Validation Loss": val_losses[-1],
            "Validation Accuracy": val_accuracies[-1],
            "Test Loss": None,
            "Test Accuracy": None
        })

        # Store results
        results[fold] = {'loss': val_losses[-1], 'accuracy': val_accuracies[-1]}
        all_train_losses.append(train_losses)
        all_val_losses.append(val_losses)
        all_val_accuracies.append(val_accuracies)
        scheduler.step(val_losses[-1])
    # Calculate average performance across all folds
    avg_loss = sum([results[fold]['loss'] for fold in results]) / k_folds
    avg_accuracy = sum([results[fold]['accuracy'] for fold in results]) / k_folds

    print(f'\nAverage Loss: {avg_loss:.4f}, Average Accuracy: {avg_accuracy:.4f}')
    for fold in range(k_folds):
        plot_metrics(
            [all_train_losses[fold], all_val_losses[fold]],
            title=f'Fold {fold+1} Loss',
            xlabel='Epochs',
            ylabel='Loss',
            legend_labels=['Train Loss', 'Validation Loss']
        )

    # 2. Validation accuracy per fold
    for fold in range(k_folds):
        plot_metrics(
            [all_val_accuracies[fold]],
            title=f'Fold {fold+1} Validation Accuracy',
            xlabel='Epochs',
            ylabel='Accuracy (%)',
            legend_labels=['Validation Accuracy']
        )
    end_time = time.time()
    total_training_time = end_time - start_time
    test_dataset = CarDataset(test_whit195, '/content/stanford_cars/car_data/car_data/test', transform=transform)
    test_loss, test_accuracy = evaluate_model_on_test(model, test_dataset, loss_fn, device)
    log_final_results(model_name, test_accuracy, test_loss, total_training_time, results_df)
    model_save_path = f"{model_name}_final_model.pth"
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved at {model_save_path}")
    results_df = pd.DataFrame(results_table)
    print(f"\nFinal Test Loss: {test_loss:.4f}")
    print(f"Final Test Accuracy: {test_accuracy * 100:.2f}%")
if __name__ == '__main__':
    main()


In [None]:
torch.cuda.empty_cache()#GPT ram cleanup

#d

In [None]:
def train_model(model, train_dl, test_dl, loss_fn, optimizer, scheduler, n_epochs=5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    losses = []
    accuracies = []
    test_losses = []
    test_accuracies = []

    model.train()  # Set the model to train mode initially
    best_test_acc = 0
    for epoch in tqdm.tqdm(range(1, n_epochs+1)):
        running_loss = 0.0
        running_correct = 0.0

        for i, data in enumerate(train_dl, 0):
            inputs, labels = data
            inputs = inputs.to(device)
            labels = labels.to(device)
            labels = labels - 1  # Adjust labels if needed

            optimizer.zero_grad()
            outputs = model(inputs)
            predicted = torch.argmax(outputs, dim=-1)
            loss = loss_fn(outputs, labels)
            loss.backward()

            # Apply gradient clipping to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()
            running_loss += loss.item()
            running_correct += (labels == predicted).sum().item()

        epoch_loss = running_loss / len(train_dl.dataset)
        epoch_acc = running_correct / len(train_dl.dataset) * 100.0

        logging.info(f"Epoch {epoch}/{n_epochs}, Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_acc:.4f}%")

        losses.append(epoch_loss)
        accuracies.append(epoch_acc)

        model.eval()  # Switch to eval mode to evaluate on test data
        test_loss, test_acc = eval_model(model, test_dl, loss_fn)
        test_losses.append(test_loss)
        test_accuracies.append(test_acc)

        if test_acc > best_test_acc:
            best_test_acc = test_acc
            logging.info('Model saved.')
            torch.save(model.state_dict(), 'best_model.pt')

        model.train()  # Switch back to train mode after validation
        scheduler.step(test_acc)  # Step the scheduler

    print('Finished Training')
    return model, losses, accuracies, test_losses, test_accuracies

def inference_with_augmentation(model, dataloader, device):
    model.eval()
    predictions = []
    targets = []

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)

            # Collect predictions from multiple augmented versions of the image
            augmented_preds = []
            for _ in range(5):  # You can modify the number of augmentations
                augmented_images = apply_augmentation(images).to(device)  # Implement a function to apply augmentation
                outputs = model(augmented_images)
                _, preds = torch.max(outputs, 1)
                augmented_preds.append(preds)

            # Aggregate predictions (majority voting)
            augmented_preds = torch.stack(augmented_preds, dim=0)
            final_preds = torch.mode(augmented_preds, dim=0).values  # Majority vote

            predictions.append(final_preds)
            targets.append(labels)

    predictions = torch.cat(predictions, dim=0)
    targets = torch.cat(targets, dim=0)

    # Calculate accuracy
    accuracy = (predictions == targets).sum().item() / len(targets)
    return accuracy

def apply_augmentation(images):
    # Convert tensors back to PIL Images for augmentation
    transform = transforms.Compose([
        transforms.ToPILImage(),  # Convert tensor to PIL Image
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(20),
        transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
        transforms.ToTensor(),  # Convert back to tensor
    ])

    # Apply augmentation for each image in the batch
    augmented_images = torch.stack([transform(image) for image in images])
    return augmented_images

def main():
    # Model, optimizer, and loss function setup
    global results_df
    model_name = "CifarCNN_195_augmentation"
    transform=transforms.Compose([
        transforms.Resize((128, 128)),  # Resize to 128x128 for uniformity
        transforms.ToTensor(),  # Convert to Tensor
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
    ])

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    dataset = CarDataset(train_whit195, '/content/stanford_cars/car_data/car_data/train', transform=transform)
    num_classes = 195
    results_table = []

    model = CifarCNN(num_classes)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)
    loss_fn = nn.CrossEntropyLoss()

    # K-Fold Cross Validation setup
    k_folds = 5
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    start_time = time.time()

    # Initialize results
    results = {}
    num_epochs = 10
    all_train_losses = []
    all_val_losses = []
    all_val_accuracies = []

    # K-fold loop
    for fold, (train_ids, val_ids) in enumerate(kfold.split(dataset)):
        print(f'Fold {fold+1}/{k_folds}')

        # Subset the dataset for training and validation
        train_subsampler = Subset(dataset, train_ids)
        val_subsampler = Subset(dataset, val_ids)

        # Create DataLoaders
        train_loader = CarDataLoader(train_subsampler, batch_size=32, shuffle=True, num_workers=4)
        val_loader = CarDataLoader(val_subsampler, batch_size=32, shuffle=False, num_workers=4)

        # Reset model, optimizer, and loss function for each fold
        model = CifarCNN(num_classes=num_classes).to(device)
        optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
        loss_fn = CrossEntropyLoss()
        scheduler = ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)
        # Learning rate scheduler
        # scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

        # Train and evaluate the model
        model, train_losses, train_accuracies, val_losses, val_accuracies = train_model(
            model, train_loader, val_loader, loss_fn, optimizer, scheduler, n_epochs=num_epochs
        )

        # Inference-Time Augmentation (TTA) during validation
        val_accuracy_with_tta = inference_with_augmentation(model, val_loader, device)

        results_table.append({
            "Model Name": model_name,
            "Fold": fold + 1,
            "Train Loss": train_losses[-1],
            "Validation Loss": val_losses[-1],
            "Validation Accuracy": val_accuracies[-1],
            "Test Loss": None,
            "Test Accuracy": None,
            "Validation Accuracy with TTA": val_accuracy_with_tta  # Add augmented accuracy
        })

        # Store results
        results[fold] = {'loss': val_losses[-1], 'accuracy': val_accuracies[-1], 'accuracy_with_tta': val_accuracy_with_tta}
        all_train_losses.append(train_losses)
        all_val_losses.append(val_losses)
        all_val_accuracies.append(val_accuracies)
        scheduler.step(val_losses[-1])
        print(f"Fold {fold + 1}/{k_folds} - Train Loss: {train_losses[-1]}, Val Loss: {val_losses[-1]}")
    # Calculate average performance across folds with TTA
    avg_loss = sum([results[fold]['loss'] for fold in results]) / k_folds
    avg_accuracy = sum([results[fold]['accuracy'] for fold in results]) / k_folds
    avg_accuracy_with_tta = sum([results[fold]['accuracy_with_tta'] for fold in results]) / k_folds

    print(f'\nAverage Loss: {avg_loss:.4f}, Average Accuracy: {avg_accuracy:.4f}')
    print(f'Average Accuracy with TTA: {avg_accuracy_with_tta:.4f}')

    # Plot Loss per fold
    for fold in range(k_folds):
        plot_metrics(
            [all_train_losses[fold], all_val_losses[fold]],
            title=f'Fold {fold+1} Loss',
            xlabel='Epochs',
            ylabel='Loss',
            legend_labels=['Train Loss', 'Validation Loss']
        )

    # Plot Validation Accuracy per fold
    for fold in range(k_folds):
        plot_metrics(
            [all_val_accuracies[fold]],
            title=f'Fold {fold+1} Validation Accuracy',
            xlabel='Epochs',
            ylabel='Accuracy (%)',
            legend_labels=['Validation Accuracy']
        )
    end_time = time.time()
    total_training_time = end_time - start_time
    test_dataset = CarDataset(test_whit195, '/content/stanford_cars/car_data/car_data/test', transform=transform)
    test_loss, test_accuracy = evaluate_model_on_test(model, test_dataset, loss_fn, device)
    log_final_results(model_name, test_accuracy, test_loss, total_training_time, results_df)
    model_save_path = f"{model_name}_final_model.pth"
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved at {model_save_path}")
    results_df = pd.DataFrame(results_table)
    print(f"\nFinal Test Loss: {test_loss:.4f}")
    print(f"Final Test Accuracy: {test_accuracy * 100:.2f}%")

if __name__ == '__main__':
    main()


#new category

In [None]:
def main():
    # Model, optimizer, and loss function setup
    global results_df
    model_name = "CifarCNN_new_catgari_196"
    transform=transforms.Compose([
    transforms.Resize((128, 128)),  # Resize to 224x224 for uniformity
    transforms.ToTensor(),  # Convert to Tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),  # Normalize
])
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    dataset = CarDataset(merged_data_train, '/content/stanford_cars/car_data/car_data/train', transform=transform)
    num_classes = 196
    results_table = []
    model = CifarCNN(num_classes).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)

    loss_fn = nn.CrossEntropyLoss()
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience=2, factor=0.5)
    # K-Fold Cross Validation setup
    k_folds = 5
    kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    start_time = time.time()
    # Device setup (GPU or CPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)

    # Initialize results
    results = {}
    num_epochs = 10
    all_train_losses = []
    all_val_losses = []
    all_val_accuracies = []

    # K-fold loop
    for fold, (train_ids, val_ids) in enumerate(kfold.split(dataset)):
        print(f'Fold {fold+1}/{k_folds}')

        # Subset the dataset for training and validation
        train_subsampler = Subset(dataset, train_ids)
        val_subsampler = Subset(dataset, val_ids)

        # Create DataLoaders
        train_loader = CarDataLoader(train_subsampler, batch_size=32, shuffle=True, num_workers=4)
        val_loader = CarDataLoader(val_subsampler, batch_size=32, shuffle=False, num_workers=4)

        # Reset model, optimizer, and loss function for each fold
        model = CifarCNN(num_classes=num_classes).to(device)
        optimizer = Adam(model.parameters(), lr=0.001)
        loss_fn = CrossEntropyLoss()

        # Learning rate scheduler
        # scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

        # Train and evaluate the model
        model, train_losses, train_accuracies, val_losses, val_accuracies = train_model(
            model, train_loader, val_loader, loss_fn, optimizer, scheduler, n_epochs=num_epochs
        )
        results_table.append({
            "Model Name": model_name,
            "Fold": fold + 1,
            "Train Loss": train_losses[-1],
            "Validation Loss": val_losses[-1],
            "Validation Accuracy": val_accuracies[-1],
            "Test Loss": None,
            "Test Accuracy": None
        })

        # Store results
        results[fold] = {'loss': val_losses[-1], 'accuracy': val_accuracies[-1]}
        all_train_losses.append(train_losses)
        all_val_losses.append(val_losses)
        all_val_accuracies.append(val_accuracies)
        scheduler.step(val_losses[-1])
    # Calculate average performance across all folds
    avg_loss = sum([results[fold]['loss'] for fold in results]) / k_folds
    avg_accuracy = sum([results[fold]['accuracy'] for fold in results]) / k_folds

    print(f'\nAverage Loss: {avg_loss:.4f}, Average Accuracy: {avg_accuracy:.4f}')
    for fold in range(k_folds):
        plot_metrics(
            [all_train_losses[fold], all_val_losses[fold]],
            title=f'Fold {fold+1} Loss',
            xlabel='Epochs',
            ylabel='Loss',
            legend_labels=['Train Loss', 'Validation Loss']
        )

    # 2. Validation accuracy per fold
    for fold in range(k_folds):
        plot_metrics(
            [all_val_accuracies[fold]],
            title=f'Fold {fold+1} Validation Accuracy',
            xlabel='Epochs',
            ylabel='Accuracy (%)',
            legend_labels=['Validation Accuracy']
        )
    end_time = time.time()
    total_training_time = end_time - start_time
    test_dataset = CarDataset(test_whit195, '/content/stanford_cars/car_data/car_data/test', transform=transform)
    test_loss, test_accuracy = evaluate_model_on_test(model, test_dataset, loss_fn, device)
    log_final_results(model_name, test_accuracy, test_loss, total_training_time, results_df)
    model_save_path = f"{model_name}_final_model.pth"
    torch.save(model.state_dict(), model_save_path)
    print(f"Model saved at {model_save_path}")
    results_df = pd.DataFrame(results_table)
    print(f"\nFinal Test Loss: {test_loss:.4f}")
    print(f"Final Test Accuracy: {test_accuracy * 100:.2f}%")
if __name__ == '__main__':
    main()
