In [4]:
# Import statements needed
import torch
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch import nn
from torch import optim
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
import os
import torchvision
import time
import pandas as pd
import random
import copy

In [5]:
# Use GPU if applicable
print(torch.cuda.is_available())
device = "cpu"
if torch.cuda.is_available():
    device = "cuda"
device

True


'cuda'

## Set up dataset + Helper Methods

In [6]:
# Download Caltech101 Data, Only need to run this cell once, comment this out after using it once
dataset = datasets.Caltech101(
    root="dataset",
    download=True
)

Files already downloaded and verified


In [7]:
# shouldn't need to change but sometimes os starts being wierd absolute path maybe needed
path_to_data = 'dataset\caltech101\\101_ObjectCategories'

# We are using an ImageFolder method since for some reason I had issues extracting the classes using the dataset.Caltech101 object
dataset = datasets.ImageFolder(path_to_data)

In [15]:
# Function to filter the dataset and keep only the top k classes + return fresh model for it
# dataset : Full dataset
# top_k   : How many of the top k-categories do you want to use
# image   : Desired image size for dataset
# seed    : Generation seed default = 42
# test_split: amount of data split into test/train set default = .8
# percent_data : % percentage of the data to include (useful for trimming dataset) default = 1.00 (full dataset)
# create_k1 : Boolean of if to return a K+1 dataset (only for Outlier Test Scenario)
def create_subset(dataset, top_k, image_size, seed = 42, test_split = .8, percent_data = 1.00, create_k1 = False):
    assert test_split >= 0.0 and test_split <= 1.0

    # Apply transforms to fit the desired sizes 
    transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    ])
    dataset.transform = transform

    # find the highest occuring image categories (so we can have more data for training)
    count = {}
    for _, label in dataset:
        if label in count.keys():
            count[label] += 1
        else:
            count[label] = 1
    # Sort in descending order as a list of tuples: [(labels, count), ...]
    sorted_labels = sorted(count.items(), key=lambda x: x[1], reverse=True)

    # Get the top_k class into a list
    top_k_class = [class_label for class_label, _ in sorted_labels[:top_k]]

    # Convert to (name, label) tuple for future mapping (if needed)
    # NOTE: This resets the indexing to be ranging from (0 to k-1) ex. for k = 2 labels will be 0 and 1 (2 classes)
    class_mapping = [(dataset.classes[x], i) for i, x in enumerate(top_k_class)]

    subset = []
    # Create a new dataset with samples from the selected classes
    for i, (data, target) in enumerate(dataset):
        if target in top_k_class:
            subset.append((data, top_k_class.index(target)))
    
    # Make number of channels consistant (some pics are B&W so we will convert them to 3 channels by duplicating the single channel)
    new_dataset = []
    for i in range(len(subset)):
        if subset[i][0].shape != torch.Size([3, image_size, image_size]):
            new_dataset.append((subset[i][0].repeat(3,1,1) , subset[i][1]))
        else:
            new_dataset.append((subset[i][0] , subset[i][1]))

    # For K+1
    if create_k1:
        k_data = new_dataset.copy()
        unknown_images = []
        for data, target in dataset:
            if target not in top_k_class:
                unknown_images.append((data, top_k))
        random.shuffle(unknown_images)
        k_data.extend(unknown_images[:len(new_dataset)])

    # Fix the seed for replication
    seeded = torch.Generator().manual_seed(seed)
    x = [test_split * percent_data, (1 - test_split) * percent_data, 1-percent_data]
    train, test, _ = torch.utils.data.random_split(new_dataset, x, generator = seeded)

    # Return a fresh model to train on
    model = torch.hub.load('pytorch/vision:v0.10.0', 'squeezenet1_1', pretrained = True)

    # EfficientNet was frankly too good it was converging to 100% accuracy rapidly and would make for poor comparisons 
    #model = EfficientNet.from_pretrained('efficientnet-b0', num_classes=top_k)
    if create_k1:
        seeded = torch.Generator().manual_seed(seed)
        x = [test_split * percent_data, (1 - test_split) * percent_data, 1-percent_data]
        train_k, test_k, _ = torch.utils.data.random_split(k_data, x, generator = seeded)
        return train, test, train_k, test_k, class_mapping, model

    return train, test, class_mapping, model

In [16]:
def train_test_loop(loss_func, optimizer, epochs, scheduler, train_dataloader, test_dataloader, model):
    # variables needed for metrics later
    train_losses = []
    test_losses = 0
    train_accuracy = []
    test_accuracy = 0
    start_time_train = time.time()
    ############################ Train Loop ############################
    for i in range(epochs):
        # variables needed for metrics later
        train_size = len(train_dataloader.dataset)
        # makes sure to set model to train
        model.train()
        train_loss = 0
        train_correct = 0
        train_num_batches = len(train_dataloader)
        for batch, (X, labels) in enumerate(train_dataloader):
            # Make sure values are on correct device
            X = X.to(device)
            labels = labels.to(device)

            # Model pred + loss
            pred = model(X)
            loss = loss_func(pred, labels)

            # Backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            # Compute metrics
            train_loss+=loss.item()
            train_correct+=(pred.argmax(axis = 1) == labels).type(torch.float).sum().item()
        # Compute metrics
        train_losses.append(train_loss/train_num_batches)
        train_accuracy.append(train_correct/train_size)
        # Update scheduler 
        scheduler.step()

    end_time_train = time.time()
    train_time = end_time_train - start_time_train
    ############################ Train Loop ############################
    
    ############################ Test Loop #############################
    test_size = len(test_dataloader.dataset)
    test_num_batches = len(test_dataloader)
    # makes sure to set model to eval
    model.eval()
    # variables needed for metrics later
    start_time_test = time.time()
    test_loss = 0
    test_correct = 0
    with torch.no_grad():
        for X, labels in test_dataloader:
            # Make sure values are on correct device
            X = X.to(device)
            labels = labels.to(device)

            # Model pred + loss
            pred = model(X)
            loss = loss_func(pred, labels)

            # Compute metrics
            test_loss+=loss.item()
            test_correct+=(pred.argmax(axis = 1) == labels).type(torch.float).sum().item()
        # Compute metrics
        test_losses = test_loss/test_num_batches
        test_accuracy = test_correct/test_size
    
    end_time_test = time.time()
    test_time = end_time_test - start_time_test
    ############################ Test Loop #############################

    return train_accuracy, train_losses, test_accuracy, test_losses, train_time, test_time

## Main Training/Testing Loop

In [None]:
# Compact high-level loop that runs everything and allows modification of all variables + make pd dataframe to store all information
graph_data = []
columns = ["Size", "K", "Device", "Train_Time", "Test_Time", "Train_Acc", "Test_Acc", "Percent_data_used", "Training_Samples", "Testing_Samples"]
df = pd.DataFrame(columns = columns)
for percent in [1]:
    for size in [64, 128]:
        for k in [10, 50]:
            for device in ['cuda', 'cpu']:
                train, test, class_mapping, model = create_subset(dataset, k, size, percent_data = percent)
                model.to(device)
                ############################################# HYPER PARAMS #############################################
                batch_size = 16
                loss_func = nn.CrossEntropyLoss()
                lr = .001
                weight_decay = .0001
                optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
                epochs = 20
                scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=epochs/3, gamma=.5) # Set gamma < 1 to add effects of a scheduler and set gamma = to negate the 
                ############################################# HYPER PARAMS #############################################
                train_dataloader = torch.utils.data.DataLoader(train, batch_size=batch_size)
                test_dataloader = torch.utils.data.DataLoader(test, batch_size=batch_size)
                train_accuracy, train_losses, test_accuracy, test_losses, train_time, test_time = train_test_loop(loss_func, optimizer, epochs, scheduler, train_dataloader, test_dataloader, model)
                # Add results to table
                data = [size, k, device, train_time, test_time, train_accuracy[-1], test_accuracy, percent, len(train), len(test)]
                row = pd.DataFrame([data], columns=columns)
                df = pd.concat([df, row], ignore_index=True)
                print(data)
                graph_data.append((data, train_accuracy, train_losses))


### Graph to look at accuracy curves to experiment with proper epochs needed

In [None]:
for data, train_accuracy, train_losses in graph_data:
    x = np.arange(0, epochs)
    plt.title("Train Accuracy Curve")
    plt.xlabel("Accuracy")
    plt.ylabel("Epochs")
    plt.plot(x, train_accuracy, color ="red", label = "Train Accuracy")
    plt.legend()
    plt.show()

### Size of model

In [None]:
# CODE TAKEN FROM ChatGPT (credit to them for this block of code)
model = torch.hub.load('pytorch/vision:v0.10.0', 'squeezenet1_1', pretrained = False)
def get_model_size(model):
    total_params = sum(p.numel() for p in model.parameters())
    # Assuming each parameter is a 32-bit float (4 bytes)
    total_size = total_params * 4 / (1024 ** 2)  # Convert to megabytes (MB)
    return total_size
x = get_model_size(model)
x

### Save Model

In [None]:
df.to_csv('results.csv', index=False)

### Generate Images

In [None]:
# Loop through different values of size and k
for size in [64, 128]:
    for k in [10, 50]:
        print(f"Size: {size}, K: {k}")
        # Create the subset
        train, test, class_mapping, model = create_subset(dataset, k, size)
        
        # number of categories
        num_categories = len(class_mapping)
        
        # 5 images per column
        fig_width = 5 * num_categories * 2
        fig_height = num_categories * 2
        
        # Generate and display 5 random images per category as one row
        for class_name, label in class_mapping:
            category_images = [img for img, l in train if l == label]
            random_indices = random.sample(range(len(category_images)), 5)
            
            # Adjust figure size for displaying images
            plt.figure(figsize=(fig_width, fig_height))
            
            for i, idx in enumerate(random_indices):
                img = category_images[idx]
                # expects H x W x C images not C x H x W
                img = np.transpose(img, (1, 2, 0)) 
                plt.subplot(1, num_categories * 5, i + 1)
                plt.imshow(img)
                if i == 0:  # only for first image prevents
                    plt.title(class_name)
                plt.axis('off')
            plt.show()

# K + 1 test, Outlier Test Scenario 

In [None]:
device = 'cuda'
columns = ["Size", "K", "Train_Time", "Test_Time", "Train_Acc", "Test_Acc", "Training_Samples", "Testing_Samples"]
df = pd.DataFrame(columns = columns)
k_test = False
for k in [2, 2, 5, 5, 10, 10, 15, 15]:
    if k_test:
        k += 1
        k_test = False
        _, _, train, test, class_mapping, model = create_subset(dataset, k, 64, create_k1= True)
    else:
        k_test = True
        train, test, class_mapping, model = create_subset(dataset, k, 64)
    model.to(device)
    ############################################# HYPER PARAMS #############################################
    batch_size = 16
    loss_func = nn.CrossEntropyLoss()
    lr = .001
    weight_decay = .0001
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    epochs = 10
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=epochs/3, gamma=.5) # Set gamma < 1 to add effects of a scheduler and set gamma = to negate the 
    ############################################# HYPER PARAMS #############################################
    train_dataloader = torch.utils.data.DataLoader(train, batch_size=batch_size)
    test_dataloader = torch.utils.data.DataLoader(test, batch_size=batch_size)
    train_accuracy, train_losses, test_accuracy, test_losses, train_time, test_time = train_test_loop(loss_func, optimizer, epochs, scheduler, train_dataloader, test_dataloader, model)
    # Add results to table
    data = [size, k, train_time, test_time, train_accuracy[-1], test_accuracy, len(train), len(test)]
    row = pd.DataFrame([data], columns=columns)
    df = pd.concat([df, row], ignore_index=True)
    print(data)

In [18]:
df.to_csv('results_k.csv', index=False)