In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import time
import numpy as np
import matplotlib.pyplot as plt
import random
import pandas as pd
import pickle

In [None]:
# Checking GPU availability
print(torch.cuda.is_available())

In [None]:
batch_size = 32
epochs = 10

In [None]:
# Image transformation pipeline
image_transforms = transforms.Compose([
    transforms.ToTensor(),  # Convert image to tensor
    transforms.CenterCrop(28)  # Crop from the center
])

In [None]:
# Function to normalize each image
def normalize_image(sample):
    img_tensor = sample[0]  # Image tensors
    label = sample[1]

    img_means = img_tensor.mean(axis=[1, 2])  # Mean of image per channel
    img_sds = img_tensor.std(axis=[1, 2])  # Overall SD of image per channel

    mean_sub = img_tensor - img_means.unsqueeze(1).unsqueeze(2)
    img_norm = mean_sub.true_divide(img_sds.unsqueeze(1).unsqueeze(2))

    return (img_norm, label)

In [None]:
# Dataset and DataLoader setup
all_train_data = list(datasets.CIFAR10(root='data/', transform=image_transforms, train=True, download=True))
random.shuffle(all_train_data)

train_data = all_train_data[:40000]
train_transformed = list(map(normalize_image, train_data))
train_loader = DataLoader(dataset=train_transformed, batch_size=batch_size, shuffle=True)

val_data = all_train_data[40000:]
val_transformed = list(map(normalize_image, val_data))
val_loader = DataLoader(dataset=val_transformed, batch_size=batch_size, shuffle=True)

test_data = datasets.CIFAR10(root='data/', transform=image_transforms, train=False, download=True)
test_transformed = list(map(normalize_image, val_data))
test_loader = DataLoader(dataset=val_transformed, batch_size=batch_size)

In [None]:
# Convolutional block module
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(ConvBlock, self).__init__()
        self.relu = nn.ReLU()  # ReLU
        self.conv = nn.Conv2d(in_channels, out_channels, **kwargs)  # Convolution
        self.batchnorm = nn.BatchNorm2d(out_channels)  # Batch normalization

    def forward(self, x):
        x = self.conv(x)
        x = self.batchnorm(x)
        x = self.relu(x)
        return x

# Inception block module
class InceptionBlock(nn.Module):
    def __init__(self, in_channels, out_ch1, out_ch3):
        super(InceptionBlock, self).__init__()
        self.ch1 = ConvBlock(in_channels=in_channels, out_channels=out_ch1, kernel_size=(3, 3), stride=(1, 1), padding='same')
        self.ch3 = ConvBlock(in_channels=in_channels, out_channels=out_ch3, kernel_size=(3, 3), stride=(1, 1), padding='same')

    def forward(self, x):
        return torch.cat([self.ch1(x), self.ch3(x)], 1)

# Downsample block module
class DownsampleBlock(nn.Module):
    def __init__(self, in_channels, conv_out):
        super(DownsampleBlock, self).__init__()
        self.convblock = ConvBlock(in_channels, conv_out, kernel_size=(3, 3), stride=(2, 2))
        self.maxpool = nn.MaxPool2d(kernel_size=(3, 3), stride=(2, 2))

    def forward(self, x):
        return torch.cat([self.convblock(x), self.maxpool(x)], 1)

In [None]:
# Mini GoogLeNet model
class MiniGoogLeNet(nn.Module):
    def __init__(self, in_channels=3, num_classes=10, dropout_prob=0):
        super(MiniGoogLeNet, self).__init__()
        self.conv1 = ConvBlock(in_channels=3, out_channels=96, kernel_size=(3, 3), stride=(1, 1))
        self.inception1 = InceptionBlock(96, 32, 32)
        self.inception2 = InceptionBlock(64,32,48)
        self.downsample1 = DownsampleBlock(80,80)
        self.inception3 = InceptionBlock(160,112,48)
        self.inception4 = InceptionBlock(160,96,64)
        self.inception5 = InceptionBlock(160,80,80)
        self.inception6 = InceptionBlock(160,48,96)
        self.downsample2 = DownsampleBlock(144,96)
        self.inception7 = InceptionBlock(240,176,160)
        self.inception8 = InceptionBlock(336,176,160)
        self.avgpool = nn.AvgPool2d(kernel_size=(7,7), padding=(1,1))
        self.dropout = nn.Dropout(p = dropout_prob)
        self.fc = nn.Linear(336,10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.inception1(x)
        x = self.inception2(x)
        x = self.downsample1(x)
        x = self.inception3(x)
        x = self.inception4(x)
        x = self.inception5(x)
        x = self.inception6(x)
        x = self.downsample2(x)
        x = self.inception7(x)
        x = self.inception8(x)
        x = self.avgpool(x)
        x = x.reshape(x.shape[0],-1)
        x = self.dropout(x)
        x = self.fc(x)
        return x

In [None]:
# create the model
model = MiniGoogLeNet(in_channels=3, num_classes=10, dropout_prob=0).to(device='cuda')

# parameter settings
loss_function = nn.CrossEntropyLoss() # loss funtion
optimizer = optim.SGD(model.parameters(), lr=0.01) # optimizer
scheduler = optim.lr_scheduler.LinearLR(optimizer) # scheduler

In [None]:
# initialize results
train_acc = []
train_all_loss = []

test_acc = []
test_all_loss = []

con_mats = []

times = []
train_times = []

In [None]:
# training
def train(epoch):
    model.train()
    curr_loss_train = 0 # loss
    correct_train = 0 # correctly classified training points
    total_train = 0 # total number of training points

    for ind, (data_train, true_labels_train) in enumerate(train_loader):
        data_train = data_train.to(device='cuda') # use GPU
        true_labels_train = true_labels_train.to(device='cuda') # use GPU

        out_train = model(data_train) # apply model to training data
        loss_train = loss_function(out_train, true_labels_train) # get loss

        optimizer.zero_grad()
        loss_train.backward()
        optimizer.step()

        curr_loss_train += loss_train.item()
        ix, predicted_train = out_train.max(1)
        correct_train += predicted_train.eq(true_labels_train).sum().item()
        total_train += true_labels_train.size(0)

    train_loss = curr_loss_train/len(train_loader) # get loss
    acc_train_val = (correct_train/total_train)*100 # get accuracy

    train_acc.append(acc_train_val)
    train_all_loss.append(train_loss)

In [None]:
# testing
def test(epoch):
    model.eval()
    curr_loss_test = 0 # loss
    correct_test = 0 # correctly classified test points
    total_test = 0 # total number of test points

    num_class = 10
    confusion_matrix = torch.zeros(num_class, num_class)
    with torch.no_grad():
        for data_test, true_labels_test in test_loader:

            data_test = data_test.to(device='cuda') # use GPU
            true_labels_test = true_labels_test.to(device='cuda') # use GPU

            out_test = model(data_test) # apply model to training data
            loss_test = loss_function(out_test, true_labels_test) # get loss

            # metrics
            curr_loss_test += loss_test.item()
            ix, predicted_test = out_test.max(1)
            correct_test += predicted_test.eq(true_labels_test).sum().item()
            total_test += true_labels_test.size(0)


    test_loss = curr_loss_test/len(test_loader) # get loss
    acc_test_val = (correct_test/total_test)*100 # get accuracy

    test_acc.append(acc_test_val)
    test_all_loss.append(test_loss)
    con_mats.append(confusion_matrix)

In [None]:
train_start = time.time()
for epoch in range(epochs):
    ep_start = time.time()
    print(f"Epoch {epoch}")
    train(epoch)
    test(epoch)
    scheduler.step()

    epoch_time = time.time() - ep_start
    print(f"Epoch time: {epoch_time:0.2f} seconds")
    times.append(epoch_time)

    train_time = time.time() - train_start
    train_times.append(train_time)

print()
print(f"Total training time: {train_time:0.2f} seconds")

In [None]:
# save all results to a dataframe for export
df_res = pd.DataFrame()
df_res['TrainAccuracy'] = train_acc
df_res['TrainLoss'] = train_all_loss
df_res['TestAccuracy'] = test_acc
df_res['TestLoss'] = test_all_loss
df_res['EpochTime'] = times
df_res['Totaltime'] = train_times

df_res.to_csv('results.csv', index=False)

In [None]:
# export all confusion matrices to a pickle
with open('confusion_matrices.pickle','wb') as handle:
    pickle.dump(con_mats, handle)

In [None]:
pd.set_option('display.max_rows', None)
df_res

In [None]:
max_accuracy = df_res['TestAccuracy'].max()
print(f"Highest accuracy reached: {max_accuracy}%")