In [0]:
# imports

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import torchvision
from torchvision import transforms
from torchvision import datasets, models, transforms

import PIL
from PIL import Image
import math
import random
import seaborn as sn
import pandas as pd
import numpy as np
from pathlib import Path
from skimage import io
import pickle
import matplotlib.pyplot as plt
import time
import os
import copy
from tqdm.notebook import tqdm

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [0]:
# device

device = 'cuda:0'

In [0]:
# dataset is on the drive, so we should mount it here

from google.colab import drive
drive.mount('/content/drive')

In [0]:
cd /content/drive/My Drive/Project DL

In [0]:
ls

In [0]:
# if you have dataset in zip archive, this should create proper directories

import zipfile
with zipfile.ZipFile('/content/drive/My Drive/Project DL/simpsons/simpsons.zip', 'r') as zip_ref:
    zip_ref.extractall('/content/drive/My Drive/Project DL/simpsons/simpsons')

In [0]:
# path to images

train_dir = Path('/content/drive/My Drive/Project DL/simpsons/simpsons/simpsons_dataset/')
test_dir = Path('/content/drive/My Drive/Project DL/simpsons/simpsons/kaggle_simpson_testset/')

In [0]:
# class to help 

class SimpsonTrainValPath():
    
  def __init__(self, train_dir, test_dir):
    
    self.train_dir = train_dir
    self.test_dir = test_dir
    self.train_val_files_path = sorted(list(self.train_dir.rglob('*.jpg')))
    self.test_path = sorted(list(self.test_dir.rglob('*.jpg')))
    self.train_val_labels = [path.parent.name for path in self.train_val_files_path]

  def get_path(self):
      
    train_files_path, val_files_path = train_test_split(self.train_val_files_path, test_size = 0.3, \
                                          stratify=self.train_val_labels)
    
    files_path = {'train': train_files_path, 'val': val_files_path}
    
    return files_path, self.test_path
  
  def get_n_classes(self):
    return len(np.unique(self.train_val_labels))

In [0]:
# get number of classes and paths to images

SimpsonTrainValPath = SimpsonTrainValPath(train_dir, test_dir)
train_path, test_path = SimpsonTrainValPath.get_path()
n_classes = SimpsonTrainValPath.get_n_classes()

In [0]:
# should be 42

n_classes 

In [0]:
# train loop in function

def train_model(model, dataloaders, criterion, optimizer, save_best_weights_path, save_last_weights_path, best_acc, num_epochs=25, is_inception=False):
    since = time.time()

    val_acc_history = []
    val_loss_history = []
    train_acc_history = []
    train_loss_history = []
    lr_find_lr = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  
            else:
                model.eval()   

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    if is_inception and phase == 'train':
                        outputs, aux_outputs = model(inputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                        scheduler.step()
                        lr_step = optimizer_ft.state_dict()["param_groups"][0]["lr"]
                        lr_find_lr.append(lr_step)

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)
                val_loss_history.append(epoch_loss)
            else:
                train_acc_history.append(epoch_acc)
                train_loss_history.append(epoch_loss)
        
        print()
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)

    history_val = {'loss': val_loss_history, 'acc': val_acc_history}
    history_train = {'loss': train_loss_history, 'acc': train_acc_history}
    
    return model, history_val, history_train, time_elapsed, lr_find_lr, best_acc

In [0]:
# for faster validation

def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

In [0]:
# initialize parameters of input model

def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
    
    model_ft = None
    input_size = 0
       
    if model_name == "resnet152":
        model_ft = models.resnet152(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224 

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

In [0]:
# Simpsons dataset

class SimpsonsDataset(Dataset):

    def __init__(self, files_path, data_transforms):
      self.files_path = files_path
      self.transform = data_transforms
      
      if 'test' not in str(self.files_path[0]):
        self.labels = [path.parent.name for path in self.files_path]
        self.label_encoder = LabelEncoder()
        self.label_encoder.fit(self.labels)
        
        with open('label_encoder.pkl', 'wb') as le_dump_file:
            pickle.dump(self.label_encoder, le_dump_file)

    def __len__(self):
      return len(self.files_path)

    def __getitem__(self, idx):

      img_path = str(self.files_path[idx]) 
      image = Image.open(img_path)
      image = self.transform(image)
      
      if 'test' in str(self.files_path[0]):
        return image
      else: 
        label_str = str(self.files_path[idx].parent.name)
        label = self.label_encoder.transform([label_str]).item()
        
        return image, label

In [0]:
# also init of everything

model_name = 'resnet152'

fc_layer = 'all-st-SGD-m.9-nest-s-cycle-exp-.00001-.05-g.99994-m.8-.9'

num_classes = n_classes

batch_size = 32

num_epochs = 2

feature_extract = False

save_last_weights_path = '/kaggle/working/' + model_name + '-' + fc_layer + '_last_weights.pth'
save_best_weights_path = '/kaggle/working/' + model_name + '-' + fc_layer + '_best_weights.pth'

In [0]:
model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)
model_ft = model_ft.to(device)

In [0]:
ls

In [0]:
PATH_W = '/content/drive/My Drive/Project DL/sympsons.pth'
model_ft.load_state_dict(torch.load(PATH_W)) # path to weights 

In [0]:
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.RandomChoice( [ 
                                  transforms.RandomHorizontalFlip(p=0.5),
                                  transforms.ColorJitter(contrast=0.9),
                                  transforms.ColorJitter(brightness=0.1),
                                  transforms.RandomApply( [ transforms.RandomHorizontalFlip(p=1), transforms.ColorJitter(contrast=0.9) ], p=0.5),
                                  transforms.RandomApply( [ transforms.RandomHorizontalFlip(p=1), transforms.ColorJitter(brightness=0.1) ], p=0.5),
                                  ] ),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [0]:
image_datasets = {mode: SimpsonsDataset(train_path[mode], data_transforms[mode]) for mode in ['train', 'val']}
image_datasets_test = SimpsonsDataset(test_path, data_transforms['val'])

In [0]:
dataloaders_dict = {'train': torch.utils.data.DataLoader(image_datasets['train'], batch_size=batch_size, shuffle=True, num_workers=4),
                    'val': torch.utils.data.DataLoader(image_datasets['val'], batch_size=batch_size, shuffle=True, num_workers=4)}
dataloader_test = torch.utils.data.DataLoader(image_datasets_test, batch_size=batch_size, shuffle=False, num_workers=4)

In [0]:
def imshow(inp, title=None, plt_ax=plt, default=False):
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt_ax.imshow(inp)
    if title is not None:
        plt_ax.set_title(title)
    plt_ax.grid(False)
    plt_ax.figure.savefig('simpsons_dataset.jpg')

In [0]:
fig, ax = plt.subplots(nrows=3, ncols=3,figsize=(8, 8), \
                        sharey=True, sharex=True)
for fig_x in ax.flatten():
    random_characters = int(np.random.uniform(0, 4500))
    im_val, label = image_datasets['train'][random_characters]

    img_label = " ".join(map(lambda x: x.capitalize(),\
                image_datasets['val'].label_encoder.inverse_transform([label])[0].split('_')))
    imshow(im_val.data.cpu(), \
          title=img_label,plt_ax=fig_x)
    
    im = imshow(im_val.data.cpu(), \
          title=img_label,plt_ax=fig_x)

In [0]:
def visualization(train, val, is_loss = True):
  
  if is_loss:
    plt.figure(figsize=(17,10))
    plt.plot(train, label = 'Training loss')
    plt.plot(val, label = 'Val loss')
    plt.title('Training and validation loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()
  
  else:
    plt.figure(figsize=(17,10))
    plt.plot(train, label = 'Training acc')
    plt.plot(val, label = 'Val acc')
    plt.title('Training and validation acc')
    plt.xlabel('Epochs')
    plt.ylabel('Acc')
    plt.legend()
    plt.show()

In [0]:
base_lr = 0.0012
max_lr = 0.0022
num_epoch = 4

criterion = nn.CrossEntropyLoss()
params_to_update = model_ft.parameters()
optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9, nesterov = True)
step_size = 2 * math.ceil( len(dataloaders_dict['train']) / batch_size )
scheduler = optim.lr_scheduler.CyclicLR(optimizer_ft, base_lr = base_lr, max_lr = max_lr, step_size_up=step_size, mode='exp_range', gamma=0.994, scale_mode='cycle', cycle_momentum=True, base_momentum=0.8, max_momentum=0.9, last_epoch=-1)

In [0]:
val_loss = []
val_acc = []
train_loss = []
train_acc = []
lr_cycle = []
best_acc = .0

for i in range(num_epoch):
      image_datasets = {mode: SimpsonsDataset(train_path[mode], data_transforms[mode]) for mode in ['train', 'val']}

    dataloaders_dict = {'train': torch.utils.data.DataLoader(image_datasets['train'], batch_size=batch_size, shuffle=False, num_workers=4),
                          'val': torch.utils.data.DataLoader(image_datasets['val'], batch_size=batch_size, shuffle=False, num_workers=4)}

    model, history_val, history_train, time_elapsed, lr_find_lr, best_acc = train_model(model_ft, dataloaders_dict, criterion, optimizer_ft, save_best_weights_path, save_last_weights_path, best_acc = best_acc, num_epochs=1, is_inception=(model_name=="inception"))

    val_loss += history_val['loss']
    val_acc += history_val['acc']
    train_loss += history_train['loss']
    train_acc += history_train['acc']
    lr_cycle += lr_find_lr

In [0]:
visualization(train_acc, val_acc, is_loss = False)


In [0]:
visualization(train_loss, val_loss, is_loss = True)


In [0]:
def predict(model, test_loader):
    with torch.no_grad():
        logits = []
    
        for inputs in test_loader:
            inputs = inputs.to(device)
            model.eval()
            outputs = model(inputs).cpu()
            logits.append(outputs)
            
    probs = nn.functional.softmax(torch.cat(logits), dim=1).numpy()
    return probs

In [0]:
def predict_one_sample(model, img_tensor, device=device):
    with torch.no_grad():
        img_tensor = img_tensor.to(device)
        model.eval()
        y_hat = model(img_tensor).cpu()
        y_pred = torch.nn.functional.softmax(y_hat, dim=1).numpy()
    return y_pred

In [0]:
def confusion_matrix():
    actual = [image_datasets['val'][i][1] for i in range( len(image_datasets['val']) ) ]
    
    image = [image_datasets['val'][i][0] for i in range( len(image_datasets['val']) ) ]
    
    img_conf_dataloader = torch.utils.data.DataLoader(image, batch_size=batch_size, shuffle=False, num_workers=4)
    
    probs = predict(model_ft, img_conf_dataloader)
    preds = np.argmax(probs, axis=1)
    
    df = pd.DataFrame({'actual': actual, 'preds': preds})
    
    confusion_matrix = pd.crosstab(df['actual'], df['preds'], rownames=['Actual'], colnames=['Predicted'], margins = False)
    
    label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))
    
    yticklabels = label_encoder.classes_
    
    plt.subplots(figsize=(20,20))

    sn.heatmap(confusion_matrix, annot=True, fmt="d", linewidths=0.5, cmap="YlGnBu", cbar=False, vmax = 30, yticklabels = yticklabels, xticklabels = yticklabels);

In [0]:
confusion_matrix()


In [0]:
import matplotlib.patches as patches
from matplotlib.font_manager import FontProperties

fig, ax = plt.subplots(nrows=3, ncols=3, figsize=(12, 12), \
                        sharey=True, sharex=True)

label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))

for fig_x in ax.flatten():
    random_characters = int(np.random.uniform(0, 1000))
    im_val, label = image_datasets['val'][random_characters]

    img_label = " ".join(map(lambda x: x.capitalize(),\
                image_datasets['val'].label_encoder.inverse_transform([label])[0].split('_')))
    
    imshow(im_val.data.cpu(), \
          title=img_label, plt_ax=fig_x)
    
    actual_text = "Actual : {}".format(img_label)

    fig_x.add_patch(patches.Rectangle((0, 53), 86, 35, color='white'))
    font0 = FontProperties()
    font = font0.copy()
    font.set_family("fantasy")
    prob_pred = predict_one_sample(model_ft, im_val.unsqueeze(0))
    # получаем вероятность
    predicted_proba = np.max(prob_pred)*100
    y_pred = np.argmax(prob_pred)
    
    predicted_label = label_encoder.classes_[y_pred]
    predicted_label = predicted_label[:len(predicted_label)//2] + '\n' + predicted_label[len(predicted_label)//2:]
    predicted_text = "{} : {:.0f}%".format(predicted_label,predicted_proba)
            
    fig_x.text(1, 59, predicted_text , horizontalalignment='left', fontproperties=font,
                    verticalalignment='top',fontsize=8, color='black',fontweight='bold')

Evaluation of adversarial attack on simpsons dataset


In [0]:
import random

class LinfPGDAttack():
    def __init__(self, model2, epsilon, num_steps, step_size):#, random_start):
        """Attack parameter initialization. The attack performs k steps of
        size a, while always staying within epsilon from the initial
        point."""
        #self.model1 = make_model()
        #self.model1.load_state_dict(torch.load('/content/drive/My Drive/Project_DL/regular_classif_dict.pth', map_location=device))
        #self.model1.to(device)
        self.epsilon = epsilon
        self.num_steps = num_steps
        self.step_size = step_size
        #self.rand = random_start
        self.loss = nn.NLLLoss()

    def perturb(self, x_nat, y, labels, model1):
        """Given a set of examples (x_nat, y), returns a set of adversarial
        examples within epsilon of x_nat in l_infinity norm."""

        e = torch.tensor(np.random.uniform(-self.epsilon, self.epsilon, x_nat.shape)) # ensure valid pixel range     
        x_gpu = x_nat.to(device)
    
        #model1.eval()
        t = torch.randint(0, 42, (1,)).to(device)

        for i in range(self.num_steps):

            x_gpu.requires_grad=True
            o = model1(x_gpu)
            loss = nn.NLLLoss()(o, t)
            grad=torch.autograd.grad(loss, x_gpu,retain_graph=True)

            x_gpu.requires_grad=False
            x = x_gpu.clone().detach().cpu().numpy()
            x = np.add(x, -self.step_size* np.sign(grad[0].detach().cpu().numpy()), casting='unsafe')
            x = np.clip(x, x_nat.detach().cpu().numpy() - self.epsilon, x_nat.detach().cpu().numpy() + self.epsilon)
            x = np.clip(x, 0, 255)
            x_gpu = torch.tensor(x).float().to(device)

        return x_gpu, t


In [0]:
label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))

In [0]:
def test_adversarial(model_ft, eps=0.01, num_steps=10, step_size=0.01, plot=False):
    attack = LinfPGDAttack(model_ft, eps, num_steps, step_size)
    labels = list(range(1, 43))

    rand_idx = np.random.randint(len(image_datasets['train']))
    x, y = image_datasets['train'][rand_idx]

    x=x.reshape(1,3,224,224)
    X, t = attack.perturb(x, y, labels, model_ft)
    pic_np = X.detach().cpu().numpy().reshape(3, 224,224)
    pic_np = np.rollaxis(pic_np, 0, 3)    # 3xHxW to HxWx3
    

    prob_pred_X = predict_one_sample(model_ft, X)
    prob_pred_x = predict_one_sample(model_ft, x.cuda())

    predicted_proba_X = np.max(prob_pred_X)*100
    predicted_proba_x = np.max(prob_pred_x)*100

    y_pred_X = np.argmax(prob_pred_X)
    y_pred_x = np.argmax(prob_pred_x)

    predicted_label_X = label_encoder.classes_[y_pred_X]
    predicted_label_x = label_encoder.classes_[y_pred_x]

    if plot:
        fig, ax = plt.subplots(1, 1, figsize=(20, 10))
        ax.imshow(pic_np)
        ax.set_title('True: ' + label_encoder.classes_[y] + ' ; Predicted: ' + predicted_label_x + ' ; Fake: ' + predicted_label_X, fontsize = 14)

    return predicted_label_x, predicted_label_X, y, t

In [0]:
# we should push decision of our classifier toward to t with adversary - let's do it and check

eps = 0.1
num_steps = 50
step_size = 0.01

predicted_label_x, predicted_label_X, y, t = test_adversarial(model_ft, eps, num_steps, step_size, plot=True)
print('Compare: this one ', predicted_label_X, ' should match ' ,label_encoder.classes_[t]) # matched

In [0]:
eps_list = [0.05 + i*0.01 for i in range(20)]

In [0]:
# small 1D gridsearch near the good starting epsilon to check whether prediction is really shifted towards new class t after attack

predicted_label_x_list = []
predicted_label_X_list = []
y_list = []
t_list = []

ratio_predicted_advers_to_t = []
ratio_predicted_x_to_X = []
ratio_misclass = []

Num = 10

#eps = 0.01
num_steps = 50
step_size = 0.01


for eps in tqdm(eps_list):

    N_pred_x_to_X = 0
    N_x_adv_to_t = 0
    N_misclass = 0

    for _ in range(Num):

        predicted_label_x, predicted_label_X, y, t = test_adversarial(model_ft, eps, num_steps, step_size, plot=False)

        if (predicted_label_X == label_encoder.classes_[t]): N_x_adv_to_t += 1 # should be close to 1
        if (predicted_label_X == predicted_label_x): N_pred_x_to_X += 1 # should decrease with increasing eps
        if (predicted_label_x != label_encoder.classes_[y]): N_misclass += 1 # should be zero
        #predicted_label_x_list.append(predicted_label_x)
        #predicted_label_X_list.append(predicted_label_X)
        #y_list.append(y)
        #t_list.append(t)

    ratio_predicted_advers_to_t.append(N_x_adv_to_t/Num)
    ratio_predicted_x_to_X.append(N_pred_x_to_X/Num)
    ratio_misclass.append(N_misclass/Num)

In [0]:
f, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize = (30, 10))

ax1.plot(eps_list, ratio_predicted_advers_to_t)
ax2.plot(eps_list, ratio_predicted_x_to_X)
ax3.plot(eps_list, ratio_misclass)

ax1.set_title('Ratio of predicted from adv examples to gen t', fontsize = 16)
ax2.set_title('Ratio of predicted from adv examples to true inputs', fontsize = 16)
ax3.set_title('Ratio of misclassified images', fontsize = 16)

ax1.set_xlabel(r'$\varepsilon$', fontsize = 18)
ax2.set_xlabel(r'$\varepsilon$', fontsize = 18)
ax3.set_xlabel(r'$\varepsilon$', fontsize = 18)

ax1.set_ylabel('ratio', fontsize = 18)
ax2.set_ylabel('ratio', fontsize = 18)
ax3.set_ylabel('ratio', fontsize = 18)

plt.show()

In [0]:
def create_dataset_item(model_ft, rand_idx, eps=0.01, num_steps=10, step_size=0.01):
    attack = LinfPGDAttack(model_ft, eps, num_steps, step_size)
    labels = list(range(1, 43))

    #rand_idx = np.random.randint(len(image_datasets['train']))
    x, y = image_datasets['train'][rand_idx]

    x=x.reshape(1,3,224,224)
    X, t = attack.perturb(x, y, labels, model_ft)
    pic_np = X.detach().cpu().numpy().reshape(3, 224,224)
    #pic_np = np.rollaxis(pic_np, 0, 3)    # 3xHxW to HxWx3

    return pic_np, t.item(), y

In [0]:
# generate dataset based on adversarial examples from simpsons dataset with chosen parameters

eps=0.15
num_steps=50
step_size=0.01

#simp_NR = []
dataset = image_datasets['val']

#for i in tqdm(range(len(dataset))):
for i in tqdm(range(1000, 1500)):

    attack = LinfPGDAttack(model_ft, eps, num_steps, step_size)
    labels = list(range(1, 43))

    #x, y = image_datasets['train'][i]
    x, y = dataset[i]
    x=x.reshape(1,3,224,224)
    X, t = attack.perturb(x, y, labels, model_ft)
    pic = X.detach().cpu().numpy().reshape(3, 224,224)

    simp_NR.append([pic, t.item(), y])

In [0]:
 # save obtained dataset

#torch.save(simp_NR, 'Simpsons_non_robust.pt') # - uncommnet this in case you want to save dataset 

In [0]:
adversarial_ds = torch.load('Simpsons_non_robust.pt')

In [0]:
# without normalization

def imshow2(inp, title=None, plt_ax=plt, default=False):
    inp = inp.numpy().transpose((1, 2, 0))
    #mean = np.array([0.485, 0.456, 0.406])
    #std = np.array([0.229, 0.224, 0.225])
    #inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt_ax.imshow(inp)
    if title is not None:
        plt_ax.set_title(title)
    plt_ax.grid(False)
    plt_ax.figure.savefig('simpsons_dataset.jpg')

In [0]:
# let's validate obtained examples

fig, ax = plt.subplots(nrows=3, ncols=3, figsize=(12, 12), \
                        sharey=True, sharex=True)

for fig_x in ax.flatten():
    random_characters = int(np.random.uniform(0, 1500))
    im_val, label_new, label_old = adversarial_ds[random_characters]

    #img_label = " ".join(map(lambda x: x.capitalize(),\
    #            adversarial_ds.label_encoder.inverse_transform([label])[0].split('_')))
    im_val = torch.tensor(im_val)
    imshow2(im_val.data, title='Original label: ' + label_encoder.classes_[label_old], plt_ax=fig_x)
    
    #actual_text = "Shifted label : {}".format(label_encoder.classes_[label_new])

    fig_x.add_patch(patches.Rectangle((0, 53), 86, 35, color='white'))
    font0 = FontProperties()
    font = font0.copy()
    font.set_family("fantasy")
    prob_pred = predict_one_sample(model_ft, im_val.unsqueeze(0))
    predicted_proba = np.max(prob_pred)*100
    y_pred = np.argmax(prob_pred)
    
    predicted_label = label_encoder.classes_[y_pred]
    predicted_label = predicted_label[:len(predicted_label)//2] + '\n' + predicted_label[len(predicted_label)//2:]
    predicted_text = "{} : {:.0f}%".format(predicted_label,predicted_proba)
            
    fig_x.text(1, 59, predicted_text , horizontalalignment='left', fontproperties=font,
                    verticalalignment='top',fontsize=8, color='black',fontweight='bold')
    
plt.savefig('simpson_predict.jpg')

In [0]:
!pip install einops

import utils

In [0]:
from utils import compute_loss, compute_accuracy, plot_loss_and_accuracy, plot_classes_preds

In [0]:
batch_size = 10
dataloader_adv = torch.utils.data.DataLoader(adversarial_ds, batch_size=batch_size, shuffle=False, num_workers=4)

In [0]:
val_accuracy_batch_t = []
val_accuracy_batch_y = []

for X_batch, t_batch, y_batch in tqdm(dataloader_adv):
    # transferring batch to GPU
    X_batch = torch.tensor(X_batch)
    X_batch_gpu = X_batch.to(device)
    # forward propagation through the model
    logits = model_ft(X_batch_gpu)

    # let's calculate the accuracy:
    accuracy1 = compute_accuracy(logits, t_batch, device=device)
    accuracy2 = compute_accuracy(logits, y_batch, device=device)

    val_accuracy_batch_t.append(accuracy1.item())
    val_accuracy_batch_y.append(accuracy2.item())

In [0]:
print('Accuracy on adverasarial dataset (for generated labels): {} %'.format(accuracy1.cpu().numpy() * 100))
print('Accuracy on adverasarial dataset (for original labels): {} %'.format(accuracy2.cpu().numpy() * 100))

In [0]:
# lets validate on unused pictures - from train (test pictures are unlabeled)

subset_indices = [i for i in range(500)] # select your indices here as a list
subset = torch.utils.data.Subset(image_datasets['train'], subset_indices)
testloader = torch.utils.data.DataLoader(subset, batch_size=1, num_workers=0, shuffle=False)

In [0]:
from IPython import display

def run(net, train_loader, 
        path_save_dict='./drive/My Drive/Project_DL/checkpoint_renet_50_robust.pth',
        n_epoch=50, lr=0.1, momentum=0.9, scheduled=False):
  
  device = torch.device("cuda" if torch.cuda.is_available() else 'cpu')
  net.to(device)

  if len(net) == 2:
    # if resnet
    criterion = nn.CrossEntropyLoss() 
  else:
    # if vgg 
    criterion = nn.NLLLoss()
    
  # optimizer = optim.Adam(net.parameters(), lr=0.01, weight_decay=5*1e-4)
  optimizer = torch.optim.SGD(net.parameters(), lr=lr, 
                              momentum=momentum, weight_decay=5*1e-4)
  if scheduled:
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 
                                                step_size=15, gamma=0.1)

  test_accuracy = []
  train_accuracy = []

  loss_train = []
  best_accuracy = 10
  for epoch in tqdm(range(n_epoch)):  # loop over the dataset multiple times

      correct = 0
      total = 0
      net.train()
      running_loss = []
      for i, data in enumerate(train_loader):
          
          # get the inputs; data is a list of [inputs, labels]
          inputs, labels, _ = data

          inputs = torch.tensor(inputs)
          labels = torch.tensor(labels)

          inputs_gpu = inputs.to(device)
          labels_gpu = labels.to(device)

          # zero the parameter gradients
          optimizer.zero_grad()

          # forward + backward + optimize

          # comment if using our net
          if len(net) == 2:
            outputs = net[1](net[0](inputs_gpu))
          else:
          # comment if using resnet
            outputs = net(inputs_gpu)

          loss = criterion(outputs, labels_gpu)
          loss.backward()
          optimizer.step()

          # print statistics
          running_loss.append(loss.item())

          # evaluate train accuracy
          with torch.no_grad():
            #_, predicted = torch.max(nn.Softmax()(outputs).data, 1)

            predicted = nn.functional.softmax(outputs, dim=1).detach().argmax(dim=1)
            total += labels_gpu.size(0)
            correct += (predicted == labels_gpu).sum().item()

      loss_train.append(np.mean(running_loss))
      train_accuracy.append(100 * correct / total)

      # evaluate on test dataloader
      correct = 0
      total = 0
      net.eval()
      with torch.no_grad():
          for data in testloader:
              images, labels = data
              images = images.to(device)
              labels = labels.to(device)

              # comment if using our net
              if len(net) == 2:
                outputs = net[1](net[0](images))
              else:
                # comment if using resnet
                outputs = net(images)
              
              #_, predicted = torch.max(nn.Softmax()(outputs).data, 1)
              predicted = nn.functional.softmax(outputs, dim=1).detach().argmax(dim=1)
              total += labels.size(0)
              correct += (predicted == labels).sum().item()

      test_accuracy.append(100 * correct / total)
      
      if len(train_accuracy) > 2 and train_accuracy[-1] > best_accuracy:
        torch.save(net.state_dict(), path_save_dict)
        best_accuracy = test_accuracy[-1] 

      display.clear_output(True)
      plt.figure(figsize=(16, 8))
      plt.subplot(121)
      plt.plot(loss_train, label='train loss')
      plt.xlabel('Epoch')
      plt.ylabel('Loss') 

      plt.subplot(122)
      plt.plot(train_accuracy, label='train accuracy')   
      plt.plot(test_accuracy, label='test accuracy')
      plt.xlabel('Iterations')
      plt.ylabel('Accuracy')

      plt.show() 
      print('#Epoch: %d | Loss Train: %.3f | Acc train: %.3f | Acc: %.3f' % 
            (epoch, loss_train[-1], train_accuracy[-1], test_accuracy[-1]))
      
      if scheduled:
        # Decay Learning Rate
        scheduler.step()
      
  print('Finished Training')
  return net, test_accuracy, loss_train, train_accuracy

In [0]:
resnet50 = torchvision.models.resnet50(pretrained=True,progress=True)
net = nn.ModuleList([resnet50, nn.Linear(1000, 42)])

In [0]:
net, test_accuracy_drand, train_loss_drand, train_accuracy_drand = run(net, 
                                                                       train_loader = dataloader_adv, 
                                                                       path_save_dict='checkpoint_resnet_50_drand_sympsons.pth',
                                                                       n_epoch=120, 
                                                                       lr=0.01, 
                                                                       scheduled=True)