# Head-On-Stomach Experiment: Success Rate

In [None]:
# Setup the dataset
# You might have to adjust the data_path in settings.py and setup.py files
!python src/data/setup.py

In [None]:
# External Imports
import os
import re
import time
from io import BytesIO
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, datasets
from torch.autograd import Variable
import matplotlib.pyplot as plt
import cv2
import sys
from tqdm.notebook import tqdm
import copy
import itertools

# GitLab Code imports
sys.path.insert(0, './src/models/')
import src.training.train_and_test as tnt
from src.data.preprocess import mean, std
from settings import img_size, test_batch_size
from src.data.customdataset import CustomImageFolder
from src.data.preprocess import mean, std, undo_preprocess_input_function
from src.utils.local_analysis import LocalAnalysis
from src.utils.helpers import find_high_activation_crop, get_all_xy

from settings import colab, username

# Random Seed
random_seed = 42
torch.manual_seed(random_seed)

In [None]:
class HiddenPrints:
    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout

In [None]:
# Directories
# You might have to change this directory path
directory = "/scratch/PPNet/datasets/cub200_cropped/"
test_dir = directory + "test_cropped/"


normalize = transforms.Normalize(mean=mean, std=std)

test_dataset = datasets.ImageFolder(
    test_dir,
    transforms.Compose(
        [
            transforms.Resize(size=(img_size, img_size)),
            transforms.ToTensor(),
            normalize,
        ]
    ),
)
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=test_batch_size,
    shuffle=False,
    num_workers=4,
    pin_memory=False,
)

In [None]:
def get_correct_classified_images(
    model,
    dataloader,
    class_specific=True,
    log=print
):
    """
    Get indices of correctly classified images.
    """
    log("\ttest")
    model.eval()

    start = time.time()
    n_examples = 0
    n_correct = 0
    n_batches = 0
    correct_idx = []
    
    for i, (image, label) in enumerate(dataloader):
        input = image.cuda()
        target = label.cuda()

        output, min_distances = model(input)
        _, predicted = torch.max(output.data, 1)
        correct_idx.extend((n_examples + (predicted == target).nonzero().squeeze(1).cpu().numpy()).tolist())

        n_examples += target.size(0)
        n_correct += (predicted == target).sum().item()
        n_batches += 1
        
        del input
        del target
        del output
        del predicted

    end = time.time()
    log("\ttime: \t{0}".format(end - start))
    log("\taccu: \t{0}%".format(n_correct / n_examples * 100))
    return correct_idx

In [None]:
# Select Network, and specify Experiment run
base_architecture = 'resnet34' #'resnet34', 'vgg19'
experiment_run = '105'

# Model directory
# load the model
if colab:
    load_model_dir = '/content/PPNet/saved_models/{}/{}/'.format(base_architecture, experiment_run)
else:
    load_model_dir = '/cluster/scratch/{}/PPNet/saved_models/{}/{}/'.format(username, base_architecture, 
                                                                            experiment_run)
load_model_name = '10_0push0.2416.pth'

la1 = LocalAnalysis(load_model_dir, load_model_name, "", attack=1)
img_size = la1.img_size

preprocess = transforms.Compose([
   transforms.Resize((img_size, img_size)),
   transforms.ToTensor(),
   la1.normalize
])

print("\nAccuracy on test data set.")
corr_rob = get_correct_classified_images(model=la1.ppnet_multi, dataloader=test_loader, 
                                         class_specific=la1.class_specific, 
                                         log=la1.log)

In [None]:
# Select Network, and specify Experiment run
base_architecture = 'resnet34' #'resnet34', 'vgg19'
experiment_run = '004'

# Model directory
# load the model
if colab:
    load_model_dir = '/content/PPNet/saved_models/{}/{}/'.format(base_architecture, experiment_run)
else:
    load_model_dir = '/cluster/scratch/{}/PPNet/saved_models/{}/{}/'.format(username, base_architecture, 
                                                                            experiment_run)
load_model_name = '10_18push0.7958.pth'

la2 = LocalAnalysis(load_model_dir, load_model_name, "", attack=1)

print("\nAccuracy on test data set.")
corr_std = get_correct_classified_images(model=la2.ppnet_multi, dataloader=test_loader, 
                                         class_specific=la2.class_specific, 
                                         log=la2.log)

In [None]:
NUM_SAMPLES = 200

corr_idx = np.array(list(set(corr_std) & set(corr_rob)))
np.random.seed(0)
corr_idx = corr_idx[np.random.choice(len(corr_idx), NUM_SAMPLES+50, replace=False)].tolist()
print ('{} images to test.'.format(len(corr_idx)))

correct_subset = torch.utils.data.Subset(test_dataset, corr_idx)
correct_loader = torch.utils.data.DataLoader(
    correct_subset,
    batch_size=1,
    shuffle=False,
    num_workers=1,
    pin_memory=False,
)

In [None]:
def analyze(
    la, 
    dataloader, 
    log=print,
    grid=7,
    max_prototyopes=5
):
    la.ppnet_multi.eval()
    start = time.time()
    n_examples = 0
    n_attacked = 0
    errors = []
    
    pbar = tqdm(dataloader)
    for k, (image, label) in enumerate(pbar):
        input = image.cuda()
        target = label.item()
          
        try:
            with HiddenPrints():
                sorted_indices_act, prototype_activation_patterns, _ = la.local_analysis(
                    input, target, show_images=False, max_prototypes=1
                )

            idx, pindex = 0, 1
            while pindex <= max_prototyopes:
                PID = sorted_indices_act[-pindex].item()
                activation_pattern = prototype_activation_patterns[idx][PID].detach().cpu().numpy()
                upsampled_activation_pattern = cv2.resize(
                    activation_pattern, dsize=(la.img_size, la.img_size), interpolation=cv2.INTER_CUBIC
                )
                high_act_patch_indices = find_high_activation_crop(upsampled_activation_pattern)
                step = img_size // grid
                act_loc = [
                    high_act_patch_indices[0] // step,
                    high_act_patch_indices[1] // step,
                    high_act_patch_indices[2] // step,
                    high_act_patch_indices[3] // step,
                ]
                act_loc = [
                    yx
                    for yx in itertools.product(
                        np.arange(act_loc[0], min(act_loc[1]+1, grid)), np.arange(act_loc[2], min(act_loc[3]+1, grid))
                    )
                ]
                loc = list(set([i for i in itertools.product(np.arange(0, grid), np.arange(0, grid))]) - set(act_loc))

                with HiddenPrints():
                    image_perturbed, _ = la.attack1(input, loc=loc, i=PID, idx=0, show_images=False)

                    _, prototype_activation_patterns, _ = la.local_analysis(
                        la.normalize(image_perturbed.squeeze(0)).unsqueeze(0), 
                        target, show_images=False, max_prototypes=2000, pid=PID
                    )

                p_act = prototype_activation_patterns[idx][PID]
                act_sim = torch.zeros(len(act_loc)).cuda()
                for i, l in enumerate(act_loc):
                    act_sim[i] = p_act[l[0], l[1]]

                if torch.max(p_act) > torch.max(act_sim):
                    n_attacked += 1
                    break
                else:
                    pindex += 1
            n_examples += 1
        except:
            errors.append(k)
        
        pbar.set_postfix({'Success rate': f'{n_attacked}/{n_examples}'})
        if n_examples == NUM_SAMPLES:
            print (f'Evaluation on {n_examples} samples completed.')
            break
        
    end = time.time()
    log("\ttime: \t\t\t{0}".format(end - start))
    log("\tattack success rate: \t{0}%".format(n_attacked / n_examples * 100))
    return n_attacked / n_examples, errors

In [None]:
print ('Standard trained model.')
acc, errors = analyze(la2, correct_loader)

In [None]:
print ('Adversarially trained model.')
acc, errors = analyze(la1, correct_loader)

In [None]:
!rm -rf /scratch/PPNet