In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms, torchvision
from torch.utils.data import DataLoader, TensorDataset
from torchvision import datasets
import torchvision.models as models
import pickle
import numpy as np
import pandas as pd
import os
import cv2
from sklearn.decomposition import PCA
from PIL import Image
from sklearn.metrics import classification_report, confusion_matrix


In [None]:
''' This model is ran on Google colab because I do not have a GPU '''
''' You can run these cells and you will have everything necessary to run or load the model'''

''' Load the datatset'''
def downloadDataset(): 
    # Download the dataset
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True)
    testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True)
    return trainset, testset

''' Load and umpickle the dataset'''
## Cifar-10 dataset comes in batches that need to be combined and unpickled to form full dataset
def loadAndUnpickleBatch(path):
    with open(path, 'rb') as file:
        batch = pickle.load(file, encoding='bytes')
        images = batch[b'data']
        labels = batch[b'labels']
        images = images.reshape(-1, 3, 32, 32).transpose(0, 2, 3, 1)
    return images, np.array(labels)
#Training data is divided into 5 batches
def loadAllBatches():
    images = []
    labels = []
    for i in range(1, 6):
        path = f'./data/cifar-10-batches-py/data_batch_{i}'
        image_batch, label_batch = loadAndUnpickleBatch(path)
        images.append(image_batch)
        labels.append(label_batch)
        combined_images = np.concatenate(images)
        combined_labels = np.concatenate(labels)
    return combined_images, combined_labels
#Test data is in a single batch
def loadTestBatch():
    path = './data/cifar-10-batches-py/test_batch'
    images, labels = loadAndUnpickleBatch(path)
    return images, labels

''' Sort images and labels into folders '''
def sortData(images, labels, folder):
    os.makedirs(folder, exist_ok=True)
    # Create a folder for each label 0-9
    for label in range(10):
        label_dir = os.path.join(folder, f'label_{label}')
        os.makedirs(label_dir, exist_ok=True)
    # Save images into the corresponding label folder
    for idx, (image, label) in enumerate(zip(images, labels)):
        label_dir = os.path.join(folder, f'label_{label}')
        image_filename = os.path.join(label_dir, f'image_{idx}.png')
        cv2.imwrite(image_filename, image)
    print('Data sorted into folders')

''' Reduce size of dataset '''
def get_first_n_images_per_class(images, labels, n):
    selected_images = []
    selected_labels = []

    for label in range(10):
        class_indices = np.where(labels == label)[0][:n]
        selected_images.append(images[class_indices])
        selected_labels.append(labels[class_indices])
    return np.concatenate(selected_images), np.concatenate(selected_labels)

def save_cifar10_images_by_label(features, labels, output_folder, n=100):
    # Create the output folder if it doesn't exist
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    label_counts = {}
    
    for idx, (image, label) in enumerate(zip(features, labels)):
        # Convert label to integer to facilitate sorting
        label = int(label)
        # Skip if we've saved `n` images for this label
        if label_counts.get(label, 0) >= n:
            continue
        # Create a subfolder for the label if it doesn't exist
        label_folder = os.path.join(output_folder, str(label))
        if not os.path.exists(label_folder):
            os.makedirs(label_folder)
        # Save the image with original index as filename
        filename = f"image_{idx}.png"
        image_path = os.path.join(label_folder, filename)
        Image.fromarray(image).save(image_path)
        label_counts[label] = label_counts.get(label, 0) + 1
        # If we've saved `n` images for each label, we're done
        if len(label_counts) == len(np.unique(labels)) and all(c >= n for c in label_counts.values()):
            break

    print(f"Saved {n} images per label in {output_folder}")

if __name__ == '__main__':
    ### Download the dataset
    trainset, testset = downloadDataset()
    ### Load and sort the dataset
    # Training
    train_images, train_labels = loadAllBatches()
    # Test
    test_images, test_labels = loadTestBatch()

    ## Sort and Reduce size of dataset: training = 500/label, test = 100/label
    # Training
    save_cifar10_images_by_label(train_images, train_labels, "Sorted/Training", n=500)
    # Testing    
    save_cifar10_images_by_label(test_images, test_labels, "Sorted/Test", n=100)

    # Sorted dataset will be used as is for CNN model


In [None]:
class VGG11(nn.Module):
    def __init__(self, num_classes=10):
        super(VGG11, self).__init__()
        
        # Define the layers as described
        ### Changes are defined in comment form
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=2, stride=1, padding=1),  # Conv(001, 064, 3, 1, 1)
            nn.BatchNorm2d(64),  # BatchNorm(064)
            nn.ReLU(),  # ReLU activation
            nn.MaxPool2d(kernel_size=2, stride=2)  # MaxPool(2, 2)
        )
        
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),  # Conv(064, 128, 3, 1, 1)
            nn.BatchNorm2d(128),  # BatchNorm(128)
            nn.ReLU(),  # ReLU activation
            nn.MaxPool2d(kernel_size=2, stride=2)  # MaxPool(2, 2)
        )
        
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),  # Conv(128, 256, 3, 1, 1)
            nn.BatchNorm2d(256),  # BatchNorm(256)
            nn.ReLU()  # ReLU activation
        )
        
        self.conv_block4 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),  # Conv(256, 256, 3, 1, 1)
            nn.BatchNorm2d(256),  # BatchNorm(256)
            nn.ReLU(),  # ReLU activation
            nn.MaxPool2d(kernel_size=2, stride=2)  # MaxPool(2, 2)
        )
        
        self.conv_block5 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),  # Conv(256, 512, 3, 1, 1)
            ### Changing kernel size to 5
            # nn.Conv2d(256, 512, kernel_size=5, stride=1, padding=2),  # Conv(256, 512, 3, 1, 1)
            nn.BatchNorm2d(512),  # BatchNorm(512)
            nn.ReLU()  # ReLU activation
        )
        
        self.conv_block6 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),  # Conv(512, 512, 3, 1, 1)
            ### Changing kernel size to 5
            # nn.Conv2d(512, 512, kernel_size=5, stride=1, padding=2),  # Conv(512, 512, 3, 1, 1)           
            nn.BatchNorm2d(512),  # BatchNorm(512)
            nn.ReLU(),  # ReLU activation
            nn.MaxPool2d(kernel_size=2, stride=2)  # MaxPool(2, 2)
        )
        self.conv_block7 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),  # Conv(512, 512, 3, 1, 1)
            nn.BatchNorm2d(512),  # BatchNorm(512)
            nn.ReLU()  # ReLU activation
        )
        
        self.conv_block8 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),  # Conv(512, 512, 3, 1, 1)
            nn.BatchNorm2d(512),  # BatchNorm(512)
            nn.ReLU(),  # ReLU activation
            nn.MaxPool2d(kernel_size=2, stride=2)  # MaxPool(2, 2)
        )
        # ### Added layer
        # self.conv_block9 = nn.Sequential(
        #     nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),  # Conv(512, 512, 3, 1, 1)
        #     nn.BatchNorm2d(512),  # BatchNorm(512)
        #     nn.ReLU(),  # ReLU activation
        #     ## no max pool else size is too small
        # )
        
        # Fully connected layers
        self.fc_block = nn.Sequential(
            nn.Linear(512 * 1 * 1, 4096),  # Linear(512, 4096)
            nn.ReLU(),  # ReLU activation
            nn.Dropout(0.5),  # Dropout(0.5)
            nn.Linear(4096, 4096),  # Linear(4096, 4096)
            nn.ReLU(),  # ReLU activation
            nn.Dropout(0.5),  # Dropout(0.5)
            nn.Linear(4096, num_classes)  # Linear(4096, 10)
        )

    def forward(self, x):
        # Forward pass through all convolutional blocks
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.conv_block5(x)
        x = self.conv_block6(x)
        x = self.conv_block7(x)
        x = self.conv_block8(x)
        # x = self.conv_block9(x)

        # Flatten the output from the convolutional layers
        x = x.view(x.size(0), -1)  # Flatten to (batch_size, 512*1*1)
        
        # Forward pass through fully connected layers
        x = self.fc_block(x)
        
        return x

In [None]:
### Rum to train

if __name__ == "__main__":
    #set device to amd gpu if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using {device}")

    ''' Import data and preprocess to input into model '''
    # We are using the sorted dataset as is which is why the preprocessing is done here   

    # Define preprocessing for train and test datasets
    transform = transforms.Compose([
        transforms.ToTensor(),  # Convert image to PyTorch tensor (C x H x W)
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # Normalize to [-1, 1]
    ])
    # Load train and test datasets
    train_dataset = datasets.ImageFolder(root="Sorted/Training", transform=transform)
    test_dataset = datasets.ImageFolder(root="Sorted/Test", transform=transform)
    # Create data loaders for training and testing to use in PyTorch
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

    # Initialize the VGG11 model
    model = VGG11(num_classes=10)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
    # Ensure it is being run on GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    # Training loop
    for epoch in range(10):  # Number of epochs
        model.train()
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        # Print the training accuracy and loss
        print(f"Epoch [{epoch+1}/10], Loss: {running_loss/len(train_loader):.4f}")
    #Evaluate the model on the test set and get accuracy
    model.eval()
    correctlyClassified = 0
    totalSamples = 0
    # Disable gradient tracking because we don't need it for evaluation
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            totalSamples += labels.size(0)
            correctlyClassified += (predicted == labels).sum().item()

    print(f"Test Accuracy: {100 * correctlyClassified / totalSamples:.2f}%")   

    # Saving the model
    torch.save(model, 'CNNclassifier.pth')  # Save the entire model


In [None]:


def evaluate_vgg11_with_confusion_matrix(model, test_loader, device):
    # Set the model to evaluation mode
    model.eval()
    # Initialize lists to store true labels and predicted labels
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, targets in test_loader:
            # Move inputs and targets to the same device as the model, running on gpu
            inputs, targets = inputs.to(device), targets.to(device)

            # Get predictions from the model
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)  # Get the index of the max log-probability
            # Append predictions and true labels to the lists
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(targets.cpu().numpy())

    # Generate confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    print("Confusion Matrix:\n", cm)

    return cm

def evalModel(model):
    # Evaluate on test set accuracy
    model.eval()
    correctlyClassified = 0
    totalSamples = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            totalSamples += labels.size(0)
            correctlyClassified += (predicted == labels).sum().item()
    print(f"Test Accuracy: {100 * correctlyClassified / totalSamples:.2f}%")

def evaluate_cnn_model(model, test_loader, device):
# Evaluating model to get all metrics for table
    # Set the model to evaluation mode
    model.eval()

    # Initialize lists to store true labels and predicted labels
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, targets in test_loader:
            # Move inputs and targets to the same device as the model
            inputs, targets = inputs.to(device), targets.to(device)
            # Get predictions from the model
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)  # Get the index of the max log-probability
            # Append predictions and true labels to the lists
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(targets.cpu().numpy())

    # Calculate accuracy
    accuracy = np.mean(np.array(all_preds) == np.array(all_labels))

    # Get classification metrics using sklearn
    report = classification_report(all_labels, all_preds, output_dict=True)

    # Extract precision, recall, f1 score for each class and average
    class_metrics = {class_label: {
        'precision': report[str(class_label)]['precision'],
        'recall': report[str(class_label)]['recall'],
        'f1-score': report[str(class_label)]['f1-score']
    } for class_label in range(10)}

    # Extract average metrics (macro avg, weighted avg)
    avg_metrics = {
        'precision': report['accuracy'],  # Accuracy is used as precision for the 'accuracy' metric
        'recall': report['macro avg']['recall'],
        'f1-score': report['macro avg']['f1-score']
    }

    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)

    # Return metrics as a dictionary
    metrics = {
        'accuracy': accuracy,
        'class_metrics': class_metrics,
        'average_metrics': avg_metrics,
        'confusion_matrix': cm
    }

    return metrics

def evaluate_average_metrics(models, model_names, test_loader, device):
    all_metrics = []  # List to hold average metrics for all models
    ## Get all metrics for each model in table
    for model, model_name in zip(models, model_names):
        model.to(device)
        metrics = evaluate_cnn_model(model, test_loader, device)
        
        # Extract average metrics
        avg_metrics = metrics['average_metrics']
        accuracy = metrics['accuracy']

        # Add model name, average metrics, and accuracy
        model_metrics = {
            'Model Name': model_name,
            'Accuracy': accuracy,
            'Precision': avg_metrics['precision'],
            'Recall': avg_metrics['recall'],
            'F1-Score': avg_metrics['f1-score']
        }

        # Append model's metrics to the list
        all_metrics.append(model_metrics)

    # Create DataFrame for all models' average metrics
    metrics_df = pd.DataFrame(all_metrics)
    return metrics_df

   
if __name__ == "__main__":
    #set device to amd gpu if available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using {device}")

    ''' Import data and preprocess to input into model '''
    # We are using the sorted dataset as is which is why the preprocessing is done here
    # Define preprocessing for train and test datasets
    transform = transforms.Compose([
        transforms.ToTensor(),  # Convert image to PyTorch tensor
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])
    # Load train and test datasets
    train_dataset = datasets.ImageFolder(root="Sorted/Training", transform=transform)
    test_dataset = datasets.ImageFolder(root="Sorted/Test", transform=transform)
    # Create data loaders for training and testing to use in PyTorch
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

    ''' Loading and Evaluating models'''
    ### Evaluating all CNN models
    CNNclassifier = VGG11(num_classes=10)
    CNNclassifier = torch.load('CNNclassifier.pth')  # Load the entire model
    evalModel(CNNclassifier)
    confusion_mat = evaluate_vgg11_with_confusion_matrix(CNNclassifier, test_loader, device)

    CNNclassifierAddLayer = VGG11(num_classes=10)
    CNNclassifierAddLayer = torch.load('CNNclassifierAddLayer.pth')  # Load the entire model
    evalModel(CNNclassifierAddLayer)
    confusion_mat = evaluate_vgg11_with_confusion_matrix(CNNclassifierAddLayer, test_loader, device)

    CNNclassifierChangeKernelsize5 = VGG11(num_classes=10)
    CNNclassifierChangeKernelsize5 = torch.load('CNNclassifierChangeKernelsize5.pth')  # Load the entire model
    evalModel(CNNclassifierChangeKernelsize5)
    confusion_mat = evaluate_vgg11_with_confusion_matrix(CNNclassifierChangeKernelsize5, test_loader, device)

    CNNclassifierChangeKernelsize5_6 = VGG11(num_classes=10)
    CNNclassifierChangeKernelsize5_6 = torch.load('CNNclassifierChangeKernelsize5_6.pth')  # Load the entire model
    evalModel(CNNclassifierChangeKernelsize5_6)
    confusion_mat = evaluate_vgg11_with_confusion_matrix(CNNclassifierChangeKernelsize5_6, test_loader, device)

    CNNclassifierDoubleEpochs = VGG11(num_classes=10)
    CNNclassifierDoubleEpochs = torch.load('CNNclassifierDoubleEpochs.pth')  # Load the entire model
    evalModel(CNNclassifierDoubleEpochs)
    confusion_mat = evaluate_vgg11_with_confusion_matrix(CNNclassifierDoubleEpochs, test_loader, device)

    # Display table with all metrics
    models = [CNNclassifier, CNNclassifierAddLayer, CNNclassifierChangeKernelsize5,CNNclassifierChangeKernelsize5_6,CNNclassifierDoubleEpochs]
    model_names = ["CNNclassifier", "CNNclassifierAddLayer", "CNNclassifierChangeKernelsize5","CNNclassifierChangeKernelsize5_6","CNNclassifierDoubleEpochs"]

    # Evaluate the models and get a table with average metrics
    comparison_df = evaluate_average_metrics(models,model_names, test_loader, device)
    print(comparison_df)


