In [None]:
import numpy as np
import torch
import torch.nn as nn
import torchvision as tv
import matplotlib.pyplot as plt
import torch.nn.functional as fn
import os
import random
import math as m
from PIL import Image

from einops import rearrange, reduce, asnumpy, parse_shape
from einops.layers.torch import Rearrange, Reduce

torch.cuda.is_available()
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Device is {device}!")

In [None]:
#https://discuss.pytorch.org/t/how-to-add-noise-to-mnist-dataset-when-using-pytorch/59745

dataset = tv.datasets.CIFAR100(root="./Datasets", train=True, download=True, transform=tv.transforms.ToTensor())

transform = tv.transforms.Compose([
    tv.transforms.RandomHorizontalFlip(p=0.5),
    tv.transforms.ToTensor()
])

test_transform = tv.transforms.Compose([
    tv.transforms.ToTensor()
])

dataset = tv.datasets.CIFAR100(root="./Datasets", train=True, transform=transform, download=True)
dataset_verify = tv.datasets.CIFAR100(root="./Datasets", train=False, transform=test_transform, download=True)

print(f"Train set\n {dataset}")
print(f"Verify set\n {dataset_verify}")

dataloader = torch.utils.data.DataLoader(dataset, batch_size=256, drop_last=True, shuffle=True)
dataloader_verify = torch.utils.data.DataLoader(dataset_verify, batch_size=256, drop_last=True, shuffle=True)

In [None]:
for images in dataloader:
    print(len(images))
    print(images[0].shape)
    print(images[1].shape)
    
    print(images[1])
    break

In [None]:
for images, labels in dataloader:
    for x in range(1, 13):
        plt.subplot(2, 6, x)
        plt.imshow(images[x].permute(1, 2, 0), cmap=plt.cm.binary)
    break

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_chan, bottleneck):
        super(ResidualBlock, self).__init__()
        
        self.bottleneck_in = nn.Conv2d(in_chan, bottleneck, kernel_size=1)
        self.filter = nn.Conv2d(bottleneck, bottleneck, kernel_size=3, stride=1, padding=1, groups=bottleneck)
        self.combination = nn.Conv2d(bottleneck, bottleneck, kernel_size=1, stride=1)
        self.bottleneck_out = nn.Conv2d(bottleneck, in_chan, kernel_size=1)
        self.bn = nn.BatchNorm2d(in_chan)

        self.layers = nn.Sequential(
            self.bottleneck_in,
            nn.BatchNorm2d(bottleneck),
            nn.ReLU(),
            self.filter,
            nn.BatchNorm2d(bottleneck),
            nn.ReLU(),
            self.combination,
            nn.BatchNorm2d(bottleneck),
            nn.ReLU(),
            self.bottleneck_out,
            nn.ReLU())

    def forward(self, x):
        out = x
        out = out + self.layers(x)
        return self.bn(out)

In [None]:
block_example = ResidualBlock(384, 12)

random_arr = torch.rand((1, 384, 16, 16))
out = block_example(random_arr)

print(f"Parameter Number: {sum(p.numel() for p in block_example.parameters())}")

In [None]:
class ConvExpand(nn.Module):
    def __init__(self, in_chan, out_chan):
        super(ConvExpand, self).__init__()
        
        self.layers = nn.Sequential(nn.Conv2d(in_chan, out_chan, 1, 1),
                                    nn.BatchNorm2d(out_chan),
                                    nn.ReLU(inplace=False))

    def forward(self, x):            
        return self.layers(x)

In [None]:
class Discriminator(nn.Module):    
    def __init__(self):
        super(Discriminator, self).__init__()

        self.entry = nn.Sequential(nn.Conv2d(3, 24, 5, 1, 2),
                                   nn.BatchNorm2d(24),
                                   nn.ReLU(),
                                   ConvExpand(24, 48))

        self.convolutions1 = nn.Sequential(ResidualBlock(48, 6),
                                    ResidualBlock(48, 6),
                                    ResidualBlock(48, 6),
                                    ConvExpand(48, 96))
        

        self.convoltuions2 = nn.Sequential(ResidualBlock(96, 12),
                                             ResidualBlock(96, 12),
                                             ResidualBlock(96, 12),
                                             ConvExpand(96, 196))
        
        self.convolutions3 = nn.Sequential(ResidualBlock(196, 24),
                                             ResidualBlock(196, 24),
                                             ResidualBlock(196, 41))
        
        self.decider = nn.Linear(196, 100)

        
    def forward(self, x):
        x = self.entry(x)
        x = self.convolutions1(x)
        x = fn.max_pool2d(x, 2)

        x = self.convoltuions2(x)
        x = fn.max_pool2d(x, 2)

        x = self.convolutions3(x)
        x = fn.avg_pool2d(x, 8)
        
        x = torch.flatten(x, 1, 3)

        return self.decider(x)

model = Discriminator()
model = model.to(device)

learning_decay = 0.98

optimiser = torch.optim.Adam(model.parameters(), lr=0.005, weight_decay=0.00005)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimiser, gamma=learning_decay)

In [None]:
for images, labels in dataloader:
    images = images.to(device)
    output = model(images)
    print(output.shape)
    del output
    break

In [None]:
print(f"Parameter Number: {sum(p.numel() for p in model.parameters())}")

In [None]:
def get_accuracy(labels, output):
    _, argmax = torch.max(output, dim=1)
    training_accuracy = argmax.eq(labels).float().mean() * 100
    return training_accuracy.item()

In [None]:
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

In [None]:
epoch = 0
step_count = 0
do_continue = True
best_test_accuracy = 0.0

print("Start")

while do_continue:
    losses = []
    training_accuracies = []
    train_iter = iter(dataloader)
    model.train()
    for images, labels in train_iter:
        images, labels = images.to(device), labels.to(device)

        output = model(images)
        loss = torch.nn.functional.cross_entropy(output, labels)
        
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
        step_count += 1
        
        if step_count >= 15000:
            do_continue = False 
            break

        losses.append(loss.item())

        training_accuracies.append(get_accuracy(labels, output))
    
        if step_count % 100 == 0:
            print(f"{step_count} steps done!!!")

    test_losses = []
    test_accuracies = []
    model.eval()
    with torch.no_grad():
        for i, batch in enumerate(dataloader_verify):
            # sample x from the dataset
            x, labels = batch
            x, labels = x.to(device), labels.to(device)

            output = model(x)
            partial_loss = torch.nn.functional.cross_entropy(output, labels)
            test_accuracy = get_accuracy(labels, output)

            test_losses.append(partial_loss.item())
            test_accuracies.append(test_accuracy)
                
    epoch += 1
    average_train_accuracy = sum(training_accuracies)/len(training_accuracies)
    average_test_accuracy = sum(test_accuracies)/len(test_accuracies)

    if average_test_accuracy > best_test_accuracy:
        best_test_accuracy = average_test_accuracy
        torch.save(model.state_dict(), "best_basic_model")

    if epoch > 20:
        scheduler.step()
    print(f"At epoch {epoch}:")
    print(f"Train loss: {sum(losses)/len(losses)}, average train accuracy {average_train_accuracy}")
    print(f"At testing loss: {sum(test_losses)/len(test_losses)} and test accuracy {average_test_accuracy}")
    print(f"Num Steps: {step_count}")

In [None]:
# Create new dataloaders with image augmentation to regulirise the model

class AddGaussianNoise(object):
    def __init__(self, std=1.0):
        self.std = std
        
    def __call__(self, image):
        return image + torch.randn(image.shape) * self.std
    
class ApplyImageAugmentation(object):
    def __init__(self, probabilities=[0.7, 0.7]):
        self.probabilities = probabilities
        self.add_noise = AddGaussianNoise(0.015)
        self.crop = tv.transforms.RandomResizedCrop((32, 32), scale=(0.70, 1.0), ratio=(0.8, 1.2))

    def __call__(self, image):
        do_crop = random.uniform(0, 1)
        if do_crop < self.probabilities[0]:
            image =  self.crop(image)
        
        do_apply_noise = random.uniform(0, 1)
        if do_apply_noise < self.probabilities[1]:
            image = self.add_noise(image)

        return image

transform = tv.transforms.Compose([
    tv.transforms.RandomHorizontalFlip(p=0.5),
    tv.transforms.RandomVerticalFlip(p=0.5),
    tv.transforms.ColorJitter((0.8, 1.2), (0.8, 1.2), (0.8, 1.2), (-0.1, 0.1)),
    tv.transforms.RandomResizedCrop((32, 32), scale=(0.70, 1.0), ratio=(0.8, 1.2)),
    tv.transforms.ToTensor()
])

test_transform = tv.transforms.Compose([
    tv.transforms.ToTensor()
])

batch_size=256

dataset = tv.datasets.CIFAR100(root="./Datasets", train=True, transform=transform, download=True)
dataset_verify = tv.datasets.CIFAR100(root="./Datasets", train=False, transform=test_transform, download=True)

dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, drop_last=True, shuffle=True)
dataloader_verify = torch.utils.data.DataLoader(dataset_verify, batch_size=batch_size, drop_last=True, shuffle=True)

In [None]:
#Final training loop to create the training graph
from IPython import display as disp

steps = 0

test_loader = dataloader_verify
do_continue = True
plot_data = []
epoch = 0
max_test_accuracy = 0.0
max_test_accuracy_std = 0.0
attained_at_step = 0

while do_continue:

    # arrays for metrics
    train_loss_arr = np.zeros(0)
    train_acc_arr = np.zeros(0)
    test_acc_arr = np.zeros(0)

    train_iterator = iter(dataloader)
    model.train()
    # iterate through some of the train dateset
    for i, (x, t) in enumerate(train_iterator):
        x, t = x.to(device), t.to(device)

        optimiser.zero_grad()
        p = model(x)
        pred = p.argmax(dim=1, keepdim=True)
        loss = torch.nn.functional.cross_entropy(p, t)
        loss.backward()
        optimiser.step()
        steps += 1

        train_loss_arr = np.append(train_loss_arr, loss.cpu().data)
        train_acc_arr = np.append(train_acc_arr, pred.data.eq(t.view_as(pred)).float().mean().item())

        if steps >= 10000:
            do_continue = False
            break
        
    epoch += 1
    if epoch > 20:
        scheduler.step()

    model.eval()
    # iterate over the entire test dataset
    for x,t in test_loader:
        x,t = x.to(device), t.to(device)
        p = model(x)
        loss = torch.nn.functional.cross_entropy(p, t)
        pred = p.argmax(dim=1, keepdim=True)
        test_acc_arr = np.append(test_acc_arr, pred.data.eq(t.view_as(pred)).float().mean().item())

    mean_test_accuracy = test_acc_arr.mean()
    if mean_test_accuracy > max_test_accuracy:
        max_test_accuracy = mean_test_accuracy
        max_test_accuracy_std = test_acc_arr.std()
        attained_at_step = steps

    # print loss and accuracy data
    print('steps: {:.2f}, train loss: {:.3f}, train acc: {:.3f}±{:.3f}, test acc: {:.3f}±{:.3f}, max acc: {:.3f}±{:.3f} attained at {}'.format(
        steps, train_loss_arr.mean(),train_acc_arr.mean(),train_acc_arr.std(),test_acc_arr.mean(),test_acc_arr.std(), max_test_accuracy, max_test_accuracy_std, attained_at_step))

    # plot accuracy graph
    plot_data.append([steps, np.array(train_acc_arr).mean(), np.array(train_acc_arr).std(), np.array(test_acc_arr).mean(), np.array(test_acc_arr).std()])
    reward_list = []
    plt.plot([x[0] for x in plot_data], [x[1] for x in plot_data], '-', color='tab:grey', label="Train accuracy")
    plt.fill_between([x[0] for x in plot_data], [x[1]-x[2] for x in plot_data], [x[1]+x[2] for x in plot_data], alpha=0.2, color='tab:grey')
    plt.plot([x[0] for x in plot_data], [x[3] for x in plot_data], '-', color='tab:purple', label="Test accuracy")
    plt.fill_between([x[0] for x in plot_data], [x[3]-x[4] for x in plot_data], [x[3]+x[4] for x in plot_data], alpha=0.2, color='tab:purple')
    plt.xlabel('Steps')
    plt.ylabel('Accuracy')
    plt.legend(loc="upper left")
    plt.show()
    disp.clear_output(wait=True)