# Load CIFAR-10 dataset

In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import random

TRAIN_SPLIT_PERCENTAGE = 0.8
VALIDATION_SPLIT_PERCENTAGE = 1 - TRAIN_SPLIT_PERCENTAGE
RANDOM_SEED = 265
EPOCH_COUNT = 30
BATCH_SIZE = 256

random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)
torch.set_default_dtype(torch.double)

transform = transforms.Compose([
    transforms.ToTensor(),

    transforms.ConvertImageDtype(torch.double),

    # Normalize the pixel color values to be between -1 and 1
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

cifar10_train = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
cifar10_test = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

Files already downloaded and verified
Files already downloaded and verified


# Filter dataset to 'airplane' and 'bird' only

In [4]:
CIFAR10_LABELS = [label for label in cifar10_test.classes]
INCLUDED_LABELS = ['airplane', 'bird']

print("Labels: ", CIFAR10_LABELS)
print("Included labels: ", INCLUDED_LABELS)

included_labels_indices = [i for i, label in enumerate(CIFAR10_LABELS) if label in INCLUDED_LABELS]

cifar10_train_included_indices = [i for i, target in enumerate(cifar10_train.targets) if target in included_labels_indices]
cifar10_test_included_indices = [i for i, target in enumerate(cifar10_test.targets) if target in included_labels_indices]

Labels:  ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
Included labels:  ['airplane', 'bird']


# Split and create loaders

In [6]:
train_indices = random.sample(cifar10_train_included_indices, int(TRAIN_SPLIT_PERCENTAGE * len(cifar10_train_included_indices)))
val_indices = [i for i in cifar10_train_included_indices if i not in train_indices]

train_subset = torch.utils.data.Subset(cifar10_train, train_indices)
val_subset = torch.utils.data.Subset(cifar10_train, val_indices)
test_subset = torch.utils.data.Subset(cifar10_test, cifar10_test_included_indices)

train_loader = torch.utils.data.DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_subset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"Train: {len(train_subset)} entries")
print(f"Validate: {len(val_indices)} entries")
print(f"Test: {len(cifar10_test_included_indices)} entries")

Train: 8000 entries
Validate: 2000 entries
Test: 2000 entries


# Visualize the dataset
I will now visualize and analyse the dataset.

## Obvious bises?
To identify obvious biases in the dataset, I need to visualize the distribution of labels.  
If one label is grosely overrepresented, I might have to downsample the other label.

## What does the data look like?
The labels sais birds and planes, but I want to know excactly what I am working with.  
Therefore I want to show one example of each label.

In [7]:
import matplotlib.pyplot as plt
import numpy as np

def show_data_example(image_tensor):
    # Unnormalize the pixel color values
    image_tensor = image_tensor * 0.5 + 0.5

    np_image = image_tensor.numpy()
    np_image = np.transpose(np_image, (1, 2, 0))
    plt.imshow(np_image)
    plt.show()

def show_data_examples(classes, loader):
    include_labels = [x for x in classes]
    examples = []
    for images, labels in loader:
        if len(include_labels) == 0: continue

        for i in range(len(labels)):
            label = classes[labels[i].item()]
            if label not in include_labels: continue
            examples.append(images[i])
            include_labels.remove(label)

    show_data_example(torchvision.utils.make_grid(examples))

def show_label_distribution(classes, loader):
    distribution = {label: 0 for label in classes}
    for _, labels in loader:
        for label in labels:
            label_name = classes[label.item()]
            distribution[label_name] += 1

    plt.bar(distribution.keys(), distribution.values())
    plt.show()

# show_label_distribution(included_labels, train_loader)
# show_data_examples(included_labels, train_loader)

In [8]:
class MyMLP(nn.Module):
    def __init__(self):
        super(MyMLP, self).__init__()
        self.input = nn.Linear(3 * 32 * 32, 512)
        self.hidden1 = nn.Linear(512, 128)
        self.hidden2 = nn.Linear(128, 32)
        self.hidden3 = nn.Linear(32, 2)

        self.activation = nn.ReLU()

    def forward(self, x):
        x = torch.flatten(x, 1)
        x = self.activation(self.input(x))
        x = self.activation(self.hidden1(x))
        x = self.activation(self.hidden2(x))
        x = self.hidden3(x)
        x = torch.flatten(x, 1)
        return x

In [9]:
def map_labels_to_neurons(y_true):
    return torch.Tensor([[1.0, 0.0] if i.item() == 0 else [0.0, 1.0] for i in y_true])

def map_neurons_to_labels(y_pred):
    # The prediction is based on included_labels,
    # but the y_true has indecies from all classes
    # Therefore we must map the prediction from included_labels to all classes
    return torch.tensor([CIFAR10_LABELS.index(INCLUDED_LABELS[i.item()]) for i in y_pred])

def compute_accuracy(model, test_loader):
    correct = 0
    total = 0
    with torch.no_grad():
        for X, y_true in test_loader:
            y_pred = model(X)

            # Pick the class with the highest probability
            value, predicted = torch.max(y_pred.data, 1)
            
            predicted = map_neurons_to_labels(predicted)

            # Update the score
            total += y_true.size(0)
            correct += (predicted == y_true).sum().item()

    return correct / total, total, correct

def train(epoch_count, optimizer, model, loss_function, train_loader, eval_mid_training=False):
    epoch_metrics = []

    for epoch in range(epoch_count):

        # We aggregate the loss to print after each epoch
        # This is only for debugging purposes
        epoch_loss = 0.0
        epoch_size = 0

        for X, y_true in train_loader:

            # Gradient descent
            optimizer.zero_grad()
            y_pred = model(X)

            # Map the true predictions to match the 
            y_true = map_labels_to_neurons(y_true)

            loss = loss_function(y_pred, y_true)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            epoch_size += 1

        # If we want metrics from training,
        # we can return them here.
        # NOTE: This will slow down the training significantly
        if eval_mid_training:
            train_accuracy, _, _ = compute_accuracy(model, train_loader)
            val_accuracy, _, _ = compute_accuracy(model, val_loader)

            # These might be used to create graphs
            epoch_metrics.append({
                'epoch': epoch + 1,
                'loss': epoch_loss / epoch_size,
                'accuracy_train': train_accuracy,
                'accuracy_val': val_accuracy
            })

        # Print loss after some epochs
        # This is for debugging purposes
        if epoch % 5 == 0:
            print(f'Epoch {epoch + 1} | Training loss: {epoch_loss / epoch_size}')

    train_accuracy, _, _ = compute_accuracy(model, train_loader)
    val_accuracy, _, _ = compute_accuracy(model, val_loader)
    performance = {
        'accuracy_train': train_accuracy,
        'accuracy_val': val_accuracy
    }

    return performance, epoch_metrics

SyntaxError: invalid syntax. Perhaps you forgot a comma? (2264158472.py, line 63)

In [None]:
def train_and_eval(model, learning_rate, momentum, weight_decay):
    print("=========================================================")
    print("Current parameters:")
    print(f"Learning rate: {learning_rate}")
    print(f"Momentum: {momentum}")
    print(f"Weight decay: {weight_decay}")

    print("")
    print("--------- Using Pytorch's SGD ---------")
    loss_function = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum, weight_decay=weight_decay)
    performance_sgd, _ = train(EPOCH_COUNT, optimizer, model, loss_function, train_loader)

def train_and_eval_all():
    hyper_parameters = [
        {'learning_rate': .01, 'momentum': .0, 'weight_decay': .000},
        {'learning_rate': .01, 'momentum': .0, 'weight_decay': .010},
        {'learning_rate': .01, 'momentum': .9, 'weight_decay': .000},
        {'learning_rate': .01, 'momentum': .9, 'weight_decay': .010},
        {'learning_rate': .01, 'momentum': .9, 'weight_decay': .001},
        {'learning_rate': .01, 'momentum': .8, 'weight_decay': .010},
    ]

    print("Global parameters:")
    print("Batch size: ", BATCH_SIZE)
    print("Epoch count: ", EPOCH_COUNT)
    print("Loss function: CrossEntropyLoss")
    print("Seed: ", RANDOM_SEED)

    for params in hyper_parameters:
        model = MyMLP()
        train_and_eval(model, params['learning_rate'], params['momentum'], params['weight_decay'])