<a href="https://colab.research.google.com/github/Mhobo/SimNets/blob/main/Jupyter_RobustnessCode.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
import numpy as np
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
import random as rnd
from scipy.stats import bernoulli
import time
import copy
import matplotlib.pyplot as plt
import pickle

In [None]:
class AddGaussianNoise(object):
    def __init__(self, mean=0., std=1.):
        self.std = std
        self.mean = mean
        
    def __call__(self, tensor):
        return tensor + torch.randn(tensor.size()) * self.std + self.mean
    
    def __repr__(self):
        return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)

In [None]:
batch_size = 128
num_workers = 8
Changes = [transforms.GaussianBlur(5,5),transforms.RandomHorizontalFlip(),transforms.RandomVerticalFlip()]
# [transforms.Pad(4), transforms.RandomCrop(28)], problem cos list
# transforms.RandomHorizontalFlip(),transforms.RandomVerticalFlip(),

In [None]:
test_rob_transform = transforms.Compose([
    Changes[i], # (3,3) Gives like a 3% drop, (5,5) like an 8% drop
    transforms.ToTensor()])

#transforms.RandomHorizontalFlip(),
#transforms.RandomVerticalFlip()
# transforms.Pad(4), transforms.RandomCrop(28)
# transforms.GaussianBlur(5,5)
# AddGaussianNoise(0., 1.)

# FashionMNIST dataset
# Dataset objects and Dataloader objects are objects Python defines
train_dataset = torchvision.datasets.CIFAR10(root='data/',
                                             train=True, 
                                             transform=transforms.ToTensor(),  #replace with = transform after
                                             download=True)

test_rob_dataset = torchvision.datasets.CIFAR10(root='data/',
                                            train=False,#need comma here to get transforms back
                                            transform=test_rob_transform)

# Data loader
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                            batch_size= batch_size, 
                                            shuffle=True, num_workers = num_workers)


test_rob_loader = torch.utils.data.DataLoader(dataset=test_rob_dataset,
                                            batch_size=128, 
                                            shuffle=False, num_workers = num_workers)

In [None]:
def train(P_DML_ss = 0,P_DML = 1, P_SS =1, ss = False):  
  net.train()
  net2.train()


  for i, (images, labels) in enumerate(train_loader):
    
    images_flat = torch.flatten(images.to(device),start_dim = 1) # Will want to flatten this in  my case
    images = images.to(device)
    labels = labels.to(device)

    guesses, _ = net(images_flat)
    loss = loss_fn(guesses,labels)

    guesses2, _ = net2(images_flat)
    loss2 = loss_fn(guesses2,labels)
    
    #DML_Loss
    loss_DML = P_DML*loss_fn(guesses,guesses2.softmax(dim=1).detach())
    loss += loss_DML

    loss_DML2 = P_DML*loss_fn(guesses2,guesses.softmax(dim=1).detach())
    loss2 += loss_DML2

    if ss == True: # Rotation Predictions
      bx = images_flat
      
      curr_batch_size = bx.size(0)
      by_prime = torch.cat((torch.zeros(bx.size(0)), torch.ones(bx.size(0)),
                                  2*torch.ones(bx.size(0)), 3*torch.ones(bx.size(0))), 0).long()
      
      bx = images
      bx = torch.cat((bx, torch.rot90(bx,1,[2,3]),torch.rot90(bx, 2,[2,3]), torch.rot90(bx, 3,[2,3])), 0)
      bx = torch.flatten(bx,start_dim=1)
      
      bx, by_prime = bx.to(device), by_prime.to(device)

      _, pen = net(bx)
      _, pen2 = net2(bx)

      ss_guesses = net.rot_pred(pen)
      ss_guesses2 = net2.rot_pred(pen2)

      ss_loss = P_SS*loss_fn(ss_guesses, by_prime)
      ss_loss2 = P_SS*loss_fn(ss_guesses2, by_prime)

      loss_DML_ss = P_DML_ss*loss_fn(ss_guesses,ss_guesses2.softmax(dim=1).detach())
      loss += loss_DML_ss

      loss_DML2_ss = P_DML_ss*loss_fn(ss_guesses2,ss_guesses.softmax(dim=1).detach())
      loss2 += loss_DML2_ss


      loss += ss_loss
      loss2 += ss_loss2

    optimizer.zero_grad() 
    
    loss.backward()
    loss2.backward()
    optimizer.step()

  scheduler.step(loss)

  return loss

In [None]:
def test(test_loader = test_rob_loader):
  net.eval()
  net2.eval()
  with torch.no_grad():
      correct = 0
      total = 0

      correct2 = 0
      total2 = 0      

      for images, labels in test_loader:
        images = torch.flatten(images.to(device),start_dim = 1)
        labels = labels.to(device)
        guesses,_ = net(images)
        _, predicted = torch.max(guesses.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        test_accuracy = correct / total

        guesses2,_ = net2(images)
        _, predicted2 = torch.max(guesses2.data, 1)
        total2 += labels.size(0)
        correct2 += (predicted2 == labels).sum().item()
  return test_accuracy

In [None]:
# the layers we want in between, need somewhere to check dimension of dataset?
class MLP(nn.Module):
  def __init__(self,layer_widths): 
# Alternatively 
    super().__init__()
    self.m = len(layer_widths) #self stores it as characteristic of this NN
    self.layers = [] # Empty list to store layers
    self.non_lin = nn.ReLU() # nn.ReLU isn't the function, but the function that constructs a relu (by saying =)
    for i in range(self.m-1): #does this make m-1 iterations?
      self.layers.append(nn.Linear(layer_widths[i], layer_widths[i+1])) # Need matrix maths to work, so have (a,b) thenn (b,c)
      self.add_module("layer"+str(i), self.layers[-1])
 
  def forward(self, x):
    z = x
    for i in range(self.m - 1):
      y = z
      z = self.non_lin(self.layers[i](z))
    return z,y
# Probably better off hardcoding, sanity check and because the layers are designed with the dataset in mind

In [None]:
device = torch.device('cuda:1')
num_epochs = 50

learning_rate = 0.1

In [None]:
# Defining loss and optimiser
torch.manual_seed(0)
m = 3
Inside = [50]*m
Inside.insert(0,(28*28))
Inside.append(10)
Inside = [(3*32*32),1536,768,384,192,96,10] # [128,10] good, [128,64, 32,10]] also gets to 87% in 15 epochs, SSL and DML drop 10% accuracy for uncorrupted data for first class at that size
# For 0.5 corruption SS does a lot better, 55% for vanilla and DML, 75% for SS
# 32,10 suggested here: https://www.kaggle.com/code/pavansanagapati/a-simple-cnn-model-beginner-guide
net_safe = MLP(Inside)
net2_safe = MLP(Inside)

net_safe.rot_pred  = nn.Linear(96, 4)
net2_safe.rot_pred = nn.Linear(96, 4)

loss_fn = nn.CrossEntropyLoss() # Also an nn.module

In [None]:
DML = [0,1,0,1,0]
SSL = [0,0,1,1,1]
DML_SSL = [0,0,0,0,1]
Type = ['Unchanged','DML','SSL','Out','In']

In [None]:
All_accs = []
for k in range(3):
    for i in range(3): # types of robustness
        
        test_rob_transform = transforms.Compose([transforms.RandomHorizontalFlip(),
                        transforms.ToTensor()])
            
        test_rob_dataset = torchvision.datasets.CIFAR10(root='data/',
                                            train=False,#need comma here to get transforms back
                                            transform=test_rob_transform)    
                
                
        test_rob_loader = torch.utils.data.DataLoader(dataset=test_rob_dataset,
                                            batch_size=128, 
                                            shuffle=False, num_workers = num_workers)
            
        for t in range(5):
            j = t+2
            
            
            net = copy.deepcopy(net_safe)
            net2 = copy.deepcopy(net2_safe)
            
            net.to(device)
            net2.to(device)
            
            
            
            optimizer = torch.optim.SGD(list(net.parameters()) +list(net2.parameters()), lr= learning_rate) # An optimiser object
            scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience = 3)
              
            loss = np.array([])
            acc = np.array([])
              
            for epoch in range(num_epochs):
                  
                begin_epoch = time.time()\
                  
                new_train_loss = train(P_DML_ss= DML_SSL[j], P_DML = DML[j], P_SS = SSL[j],ss = True)
                
                
                loss = np.append(loss,new_train_loss.cpu().detach().numpy())
                new_acc = test(test_rob_loader)
                acc = np.append(acc, new_acc)
              
                #print('Epoch', epoch, '| Time Spent:', round(time.time() - begin_epoch, 2), 'Train_Loss:',new_train_loss, 'Test_Accuracy1:', new_acc)
            print('Random Horizontal FLip')
            print(Type[j])
            print(acc)
            All_accs.append((acc,Type[j]))
            
with open('Rob.pickle', 'wb') as handle: 
    pickle.dump(All_accs, handle)