In [None]:
# for recrod keeping purposes
import time

from comet_ml import Experiment

import torch
from sklearn.model_selection import train_test_split
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline

## Dependencies
print("Using torch", torch.__version__)
print("Cuda version is:", torch.version.cuda)
# print("cuDNN version is :", torch.backends.cudnn.version())

I used Comet ML for ease of graphing. I've also added some matplot graphs below to check for differences between the two and found none. Work produced in CometML linked below for all testing graphs.

https://www.comet.ml/nguyensome/dl/view/new/panels


In [None]:
experiment = Experiment(
    api_key="dBen8W4vvf6ErSgyIaZZhL9UG",
    project_name="dl",
    workspace="nguyensome",
)

In [None]:
## Setting hyperparameters & device
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Device", device)
torch.manual_seed(3)
                  
## Global hyperparamters set here after hyperparameter tuning
batch_size = 128
learning_rate = 1e-4
num_epochs = 100
num_classes =  100

1) Load CIFAR100 and split into train, validation, test sets. Dataloader will handle shuffling, batching, etc.

2) Random Erasing selected as suggested by: https://journalofbigdata.springeropen.com/articles/10.1186/s40537-019-0197-0

In [None]:
transform =  transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(0.4),
    transforms.RandomErasing(),
    transforms.Normalize((0.5, 0.5, 0.4), (0.225, 0.225, 0.225))
])

## Load test data
testset = torchvision.datasets.CIFAR100(root='./data', train= False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                        shuffle=False, num_workers=2, pin_memory=True)

## Split training data into training and validation sets 4:1
trainset =  torchvision.datasets.CIFAR100(root='./data', train = True,
                                         download=True, transform=transform)

train_indices, val_indices = train_test_split(list(range(len(trainset.targets))), 
                                              test_size=0.2, stratify=trainset.targets)
train = torch.utils.data.Subset(trainset, train_indices)
val = torch.utils.data.Subset(trainset, val_indices)

trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size,
                                         shuffle=True, num_workers=2, pin_memory=True)
valloader = torch.utils.data.DataLoader(val, batch_size=batch_size,
                                         shuffle=True, num_workers=2, pin_memory=True)

In [None]:
print(len(trainloader))
print(len(valloader))
print(len(testloader))

In [None]:
## Function to train barebone model for hyperparameter testing
def trainhyp(model, learning_rate, batch_size, optimizer, num_epoch):
    
    transform =  transforms.Compose([
    transforms.ToTensor(),
    ])

    testset = torchvision.datasets.CIFAR100(root='./data', train= False,
                                           download=True, transform=transform)
    testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                            shuffle=False, num_workers=2, pin_memory=True)

    trainset =  torchvision.datasets.CIFAR100(root='./data', train = True,
                                             download=True, transform=transform)

    train_indices, val_indices = train_test_split(list(range(len(trainset.targets))), 
                                                  test_size=0.2, stratify=trainset.targets)
    train = torch.utils.data.Subset(trainset, train_indices)
    val = torch.utils.data.Subset(trainset, val_indices)

    trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size,
                                             shuffle=True, num_workers=2, pin_memory=True)
    valloader = torch.utils.data.DataLoader(val, batch_size=batch_size,
                                             shuffle=True, num_workers=2, pin_memory=True)

    
    criterion = nn.CrossEntropyLoss()
    
    if optimizer == "Adam":
        optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay = 1e-2)
    elif optimizer == "SGD":
        optimizer = optim.SGD(model.parameters(), lr=learning_rate, weight_decay = 1e-2)
    else:
        optimizer = optim.Adagrad(model.parameters(), lr=learning_rate, weight_decay = 1e-2)
        
    for epoch in range(num_epoch):

        model = model.train()
        
        for t, (images, labels) in enumerate(trainloader):
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            # forward, loss, backprop
            logits = model(images)
            loss = criterion(logits, labels)
            
            loss.backward()

            optimizer.step()

    return testloader

In [None]:
def train_lrs(model):

    avg_train_loss = []
    avg_val_loss = []
    val_loss = []
    train_loss = []
    val_acc = []
    train_acc = []
    
    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay = 1e-2)
    schedular = optim.lr_scheduler.ReduceLROnPlateau(optimizer)
    

    for epoch in range(num_epochs):
        # Clear list each episode
        train_loss = []
        val_loss = []

        model = model.train()
        
        for t, (images, labels) in enumerate(trainloader):
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            # forward, loss, backprop
            logits = model(images)
            loss = criterion(logits, labels)
            experiment.log_metric("Training Loss", loss.item())
            
            train_loss.append(loss.item())
            loss.backward()

            optimizer.step()
        
        # Evaluating w/ validation set
        model.eval()

        with torch.no_grad():
            for v, (images, labels) in enumerate(valloader):
                images = images.to(device)
                labels = labels.to(device) 
                logits = model(images)

                loss = criterion(logits, labels)
                experiment.log_metric("Validation Loss", loss.item())
                val_loss.append(loss.item())
        
        schedular.step(loss)
        accuracy = evaluate(valloader, model)
        experiment.log_metric("Validation Accuracy", accuracy, epoch=epoch)
        val_acc.append(accuracy)
        
        acc = evaluate(trainloader, model)
        experiment.log_metric("Training Accuracy", acc, epoch=epoch)
        train_acc.append(acc)
        
        train_l = np.average(train_loss)
        val_l = np.average(val_loss)
        avg_train_loss.append(train_l)
        avg_val_loss.append(val_l)

        print('Epoch : %d/%d | Train Loss : %.2f | Val Loss : %.2f | Val Acc : %.3f' 
              % (epoch, num_epochs, train_l, val_l, accuracy))

    return avg_train_loss, avg_val_loss, val_acc, train_acc

In [None]:
## Accuracy testing regular for training purposes as well as top 1 and 5 for testing purposes
def evaluate(dataset, model):
    model = model

    test_acc, total, correct = 0, 0, 0
    
    model.eval()
    with torch.no_grad():
        for images, labels in dataset:
            images = images.to(device)
            labels = labels.to(device)
            logits = model(images)

            _, predicted =torch.max(logits, 1)
            correct += (predicted == labels.data).sum()
            
    total = len(dataset.dataset)
    test_acc = (correct/total).item()
    
    return test_acc      

def top_evaluate(dataset, model):
    top1, top5, count = 0, 0 , 0
    with torch.no_grad():
        for images, labels in dataset:
            images = images.to(device)
            labels = labels.to(device)
            logits = model(images)

            _, predicted =torch.max(logits, 1)
            top1 += (predicted == labels.data).sum()
            count += predicted.shape[0]
    acc_top = top1/count
    print("Top-1 error: %.2f" % (acc_top ))
    
    return acc_top

In [None]:
## Function to find learning rate
def LRFind(model, learning_rate):
    
    avg_train_loss = []
    train_loss = []
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        # Clear list each episode
        train_loss = []
        model = model.train()
        
        for t, (images, labels) in enumerate(trainloader):
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()

            # forward, loss, backprop
            logits = model(images)
            loss = criterion(logits, labels)
            experiment.log_metric(name=learning_rate, value=loss.item() )
            
            train_loss.append(loss.item())
            loss.backward()

            optimizer.step()
        
        train_l = np.average(train_loss)
        avg_train_loss.append(train_l)

        print('Epoch : %d/%d | Train Loss : %.2f' 
              % (epoch, num_epochs, train_l))

    return avg_train_loss

Grid search will find initial hyperparamters for the model. Parameters such as weight decay, momentum, etc are to be manually tested after.

In [None]:
## Grid search function over select hyperparameters
def GridSearch(net_type):
    lrs = [1e-3, 1e-4, 5e-4, 1e-5]
    optimizers = ["Adam", "Adagrad", "SDG"]
    batch_sizes = [32, 64, 128]

    Dict = {}
    s=''
    epoch = 10
    
    for lr in lrs:
            for batch_size in batch_sizes:
                for opt in optimizers:
                    
                    # determine model
                    if net_type == 'LN':
                        model = LinearNet()
                        print('LN')
                    elif net_type == 'CN':
                        model = ConvNet()
                        print('CN')
                    else:
                        print('GN')
                        model = models.googlenet(aux_logits = False)
                        for param in model.parameters():
                            param.grad_requires = False

                        in_features = model.fc.in_features
                        model.fc = nn.Linear(in_features, 100, bias=True)

                    model = model.to(device)
                    
                    # Record keeping dictionary and Comet
                    s =''.join([str(lr), str(opt), str(batch_size)])
                    experiment.set_name(s)
                    
                    testset = trainhyp(model, lr, batch_size, opt, epoch)
                    test_acc = evaluate(testset, model)
                    Dict[s] = test_acc
                    
    return max(Dict, key=Dict.get), Dict

Part 1: Linear Model. Model has input layer, 1 FC layer, output layer.

In [None]:
class LinearNet(nn.Module):
    def __init__(self):
        super(LinearNet, self).__init__()
        self.layers = nn.Sequential(
#             nn.Linear(32*32*3, 64*64*10), #input layer
#             nn.ReLU(),
#             nn.Linear(64*64*10, num_classes)
#         )
        ## temp test for lr
            nn.Linear(32*32*3, 32*32*10), #input layer
            nn.ReLU(),
            nn.Linear(32*32*10, num_classes)
        )
        
    # Output tensor
    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.layers(x)

Below are the functions to run either a hyperparamter grid search for linear net or a learning rate search. No model initialization needed beforehand.

In [None]:
# ## LR selection for LN ##
# def LNRateSelection(param):
#     torch.cuda.empty_cache()
#     lr_list = []
#     for p in param:
#         #for each learning rate, instantiate a new model
#         modelLN = LinearNet()
#         modelLN = modelLN.to(device)
#         train_loss = LRFind(modelLN, p)
#         lr_list.append(train_loss)
    
#     return lr_list

# # Testing LR hyperparameters
# parameters = [1e-3, 1e-4, 1e-5, 1e-6, 1e-7]

# LNArray = LNRateSelection(parameters)

Optimal learning rate selected to be 1e-4 for Linear Network. 

In [None]:
# %%time
# value, Dict = GridSearch('LN')

In [None]:
# print(value)
# print(Dict[value])

In [None]:
%%time
## Approximately 45mins for 100 epochs

modelLN = LinearNet()
modelLN = modelLN.to(device)
print(modelLN)
a, b, c, d = train_lrs(modelLN)

Sample of an accuracy plot using matplot below for demonstration purposes.

In [None]:
# plt.plot(c, label = 'Training Accuracy')
# plt.plot(d, label = 'Validation Accuracy')
# plt.legend()
# plt.show()

Test accuracy with the final model was about 22% and 26% test and training respectively on 100 epochs.

In [None]:
## Evaluating model with test set ##
test_acc = evaluate(testloader, modelLN)
train_acc = evaluate(trainloader, modelLN)
print(test_acc, train_acc)

_ = top_evaluate(testloader, modelLN)

Run these two blocks to start a new comment experiment for next model

In [None]:
# del modelLN
# experiment.end()

In [None]:
# experiment = Experiment(
#     api_key="dBen8W4vvf6ErSgyIaZZhL9UG",
#     project_name="dl",
#     workspace="nguyensome",
# )

Part 2: CNN with 6 layers. 

In [None]:
## CNN attempt 2 LeNet5 reference
## conv default stride = 1, padding  = 0
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.CLayers = nn.Sequential(
        # Calculations *Andrew Ng Youtube:
        # https://www.youtube.com/watch?v=3PyJA9AfwSk
        # [(n + 2p -f)/s] +1 
        
        # Convolution block one with 2 layers
        nn.Conv2d(in_channels=3, out_channels=32,kernel_size=3),
        nn.BatchNorm2d(32),
        nn.ELU(),
        nn.Conv2d(in_channels=32, out_channels=32,kernel_size=3),
        nn.ELU(),
        nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
        nn.Dropout(0.5),

        # Convolution block two with 2 layers
        nn.Conv2d(in_channels=32, out_channels=64,kernel_size=3),
        nn.BatchNorm2d(64),
        nn.ELU(),
        nn.Conv2d(in_channels=64, out_channels=64,kernel_size=3),
        nn.ELU(),
        nn.MaxPool2d(kernel_size=2, stride=2, ceil_mode=True),
        nn.Dropout(0.5),
        )
        
        self.LinLayers = nn.Sequential(
        nn.Linear(3072, 512),
        nn.ELU(),
        # add dropout before final layer to deal with overfitting
        # promote independence between feature maps
        nn.Dropout(0.25),
        nn.Linear(512, num_classes)
        )
        
    # Output tensor
    def forward(self, x):
        self.CLayers(x)
        x = x.view(x.size(0), -1)
        return self.LinLayers(x)

Below are the functions to run either a hyperparamter grid search for ConvNet or a Learning rate search. No model initialization needed beforehand.

In [None]:
# %%time
# Cvalue, CDict = GridSearch('CN')
# print(Cvalue)
# print(CDict[Cvalue])

In [None]:
# # LR selection for CNN
# def CNRateSelection(param):
#     lr_list = []
#     for p in param:
#         #for each learning rate, instantiate a new model
#         model = ConvNet()
#         model = model.to(device)
#         train_loss = LRFind(model, p)
#         lr_list.append(train_loss)
    
#     return lr_list

# # Testing LR hyperparameters
# parameters = [1e-3, 5e-3, 1e-4, 5e-4, 1e-5, 5e-5, 1e-6]

# CNArray = CNRateSelection(parameters)

In [None]:
model = ConvNet()
model = model.to(device)
print(model)

In [None]:
%%time
## ~45mins for 100 epochs
# w,x,y,z = train(model, 1e-4)
w,x,y,z = train_lrs(model)

Barebone accuracy of 26% on 20 epochs. Accuracy of 19.6 test and 22.4 training with data augmentation and learning rate scheduler.

In [None]:
## Evaluating model with test set ##
test_acc = evaluate(testloader, model)
train_acc = evaluate(trainloader, model)
print(test_acc, train_acc)

_ = top_evaluate(testloader, model)

In [None]:
# del model
# experiment.end()

In [None]:
# experiment = Experiment(
#     api_key="dBen8W4vvf6ErSgyIaZZhL9UG",
#     project_name="dl",
#     workspace="nguyensome",
# )

Part 3: GoogLeNet - torchvision model

In [None]:
import torchvision.models as models

In [None]:
# %%time
# Gvalue, GDict = GridSearch('GN')
# print(Gvalue)
# print(GDict[Gvalue])

In [None]:
modelGLN = models.googlenet(aux_logits = False)
for param in modelGLN.parameters():
    param.grad_requires = False
    
in_features = modelGLN.fc.in_features
modelGLN.fc = nn.Linear(in_features, 100, bias=True)
modelGLN = modelGLN.to(device)
print(modelGLN)

Best rate found to be 1e-4

In [None]:
# # LR selection for GLN
# def GLNRateSelection(param):
#     lr_list = []
#     for p in param:
#         #for each learning rate, instantiate a new model
#         modelGLN = models.googlenet(aux_logits = False)
#         for param in modelGLN.parameters():
#             param.grad_requires = False

#         in_features = modelGLN.fc.in_features
#         modelGLN.fc = nn.Linear(in_features, 100, bias=True)
#         modelGLN = modelGLN.to(device)
#         train_loss = LRFind(modelGLN, p)
#         lr_list.append(train_loss)
    
#     return lr_list
# params = [1e-2, 1e-3, 1e-4, 5e-4, 1e-5, 5e-5, 1e-6]
# # Testing LR hyperparameters
# GLNArray = GLNRateSelection(params)

In [None]:
%%time
## One hour~ for 100 Epochs
# w,x,y,z = train(modelGLN, 1e-4)
w,x,y,z = train_lrs(modelGLN)

Barebone accuracy of 35% on 20 epochs. Only randomerasing had accuracy of test 0.371 and train 0.480. Still some overfitting issues here. Added a second data augmentation method, results was 0.37 and 0.42 test and train.

Model significantly overfits after 50 epochs with 47.5% testing and 76.5% training accuracy.

In [None]:
## Evaluating model with test set ##
test_acc = evaluate(testloader, modelGLN)
train_acc = evaluate(trainloader, modelGLN)
print(test_acc, train_acc)

_ = top_evaluate(testloader, modelGLN)

In [None]:
# del modelGLN

In [None]:
epsilons = [0, .05, .1, .15, .2, .25, .3] 
classes = ('beaver', 'dolphin', 'otter', 'seal', 'whale',
           'aquarium fish', 'flatfish', 'ray', 'shark', 'trout',
           'orchids', 'poppies', 'roses', 'sunflowers', 'tulips',
           'bottles', 'bowls', 'cans', 'cups', 'plates',
           'apples', 'mushrooms', 'oranges', 'pears', 'sweet peppers',
           'clock', 'computer keyboard', 'lamp', 'telephone', 'television',
           'bed', 'chair', 'couch', 'table', 'wardrobe',
           'bee', 'beetle', 'butterfly', 'caterpillar', 'cockroach',
           'bear', 'leopard', 'lion', 'tiger', 'wolf',
           'bridge', 'castle', 'house', 'road', 'skyscraper', 
           'cloud', 'forest', 'mountain', 'plain', 'sea',
           'camel', 'cattle', 'chimpanzee', 'elephant', 'kangaroo',
           'fox', 'porcupine', 'possum', 'raccoon', 'skunk',
           'crab', 'lobster', 'snail', 'spider', 'worm',
           'baby', 'boy', 'girl', 'man', 'woman',
           'crocodile', 'dinosaur', 'lizard', 'snake', 'turtle',
           'hamster', 'mouse', 'rabbit', 'shrew', 'squirrel',
           'maple', 'oak', 'palm', 'pine', 'willow',
           'bicycle', 'bus', 'motorcycle', 'pickup' 'truck', 'train',
           'lawn-mower', 'rocket', 'streetcar', 'tank', 'tractor'
          )

In [None]:
def fgsm_attack(model, image, target, epsilon):
    image.requires_grad = True

    output = model(image)
    pred = output.max(1, keepdim=True)[1] 
    
    # Ignore incorrect predictions
    if pred[0] != target[0]:
        return image
    
    # Negative likelihood loss used by FGSM
    loss = F.nll_loss(F.log_softmax(output), target)
    model.zero_grad()
    loss.backward()

    data = image.grad.data
    # sign of gradient needed to know which directions to move
    sign_data_grad = data.sign()
    
    perturbed_image = image + epsilon*sign_data_grad
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    
    return perturbed_image

In [None]:
def attack(model, epsilon):

    correct = 0
    adversary = []  
    original = []  

    for image, target in testloader:

        image = image.to(device)
        target = target.to(device)
        # Forward pass the data through the model
        output = model(image)
        pred = output.max(1, keepdim=True)[1] 
        
        perturbed_data = fgsm_attack(model, image, target, epsilon=epsilon)
        
        # Classify perturbed image
        output = model(perturbed_data)

        final_pred = output.max(1, keepdim=True)[1] 
        # No effect
        if final_pred[0].item() == target[0].item(): 
            correct += 1
            
            # 0 epsilon examples 
            if (epsilon == 0) and (len(adversary) < 5):
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adversary.append((pred[0].item(), final_pred[0].item(), adv_ex))
                ori_ex = image.squeeze().detach().cpu().numpy()
                original.append((target[0].item(), pred[0].item(), ori_ex))
        else:
            if len(adversary) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adversary.append((pred[0].item(), final_pred[0].item(), adv_ex))
                ori_ex = image.squeeze().detach().cpu().numpy()
                original.append((target[0].item(), pred[0].item(), ori_ex))

    # Calculate final accuracy for this epsilon
    final_acc = correct/(len(testloader))
    print("Epsilon: {} | Test Accuracy = {} | {} = {}".format(epsilon, correct, len(testloader), final_acc))

    return final_acc, adversary, original

In [None]:
#FGSM attack
accuracies = [] 
adversary = [] 
original = []

# Run test for each epsilon
for eps in epsilons:
    acc, adv, orig = attack(modelGLN, eps)
    accuracies.append(acc)
    adversary.append(adv)
    original.append(orig)

In [None]:
#Accuracy after attack vs epsilon
plt.figure(figsize=(5,5))
plt.plot(epsilons, accuracies, "*-")
plt.yticks(np.arange(0, 1.1, step=0.1))
plt.xticks(np.arange(0, .35, step=0.05))
plt.title("CIFAR Model Accuracy vs Epsilon")
plt.xlabel("Epsilon")
plt.ylabel("Accuracy")
plt.show()

In [None]:
# Plot several examples vs their adversarial samples at each epsilon for fgms attack
cnt = 0
plt.figure(figsize=(8,20))
for i in range(len(epsilons)):
    for j in range(len(adversary[i])):
        cnt += 1
        plt.subplot(len(epsilons),len(adversary[0]),cnt)
        plt.xticks([], [])
        plt.yticks([], [])
        if j==0:
                plt.ylabel("Eps: {}".format(epsilons[i]), fontsize=14)
        orig,adv,ex = adversary[i][0]
        plt.title("{} -> {}".format(classes[orig], classes[adv]))
        plt.imshow(ex[0].transpose(1,2,0), cmap="gray")
plt.tight_layout()
plt.show()