# Setup and Installation of Libraries

In [None]:
!pip install cleverhans --quiet
!pip install adversarial-robustness-toolbox --quiet
!pip install multiprocess --quiet
!pip install importlib --quiet
!pip install advertorch --quiet
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118 --quiet
!pip install git+https://github.com/RobustBench/robustbench.git
!git clone https://github.com/Georgsiedel/adversarial-distance-estimation.git

## Importing Required Libraries for Adversarial Robustness and Model Evaluation

In [None]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
import numba
numba.__version__

import importlib
import time
import matplotlib.pyplot as plt
import numpy as np

from art.estimators.classification import PyTorchClassifier
from art.metrics import clever_u

## Setting the Device for Computation

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## Function to Load and Truncate the CIFAR-10 Dataset for Experiments

In [None]:
def load_dataset(dataset_split):
    # Load CIFAR-10 dataset using torchvision
    transform = transforms.Compose([
      transforms.ToTensor(),
                                 ])
    testset = datasets.CIFAR10(root='./data/cifar', train=False, download=True, transform=transform)

    # Truncated testset for experiments and ablations
    if isinstance(dataset_split, int):
        testset, _ = torch.utils.data.random_split(testset,
                                                          [dataset_split, len(testset) - dataset_split],
                                                          generator=torch.Generator().manual_seed(42))
    
    # Extract data and labels from torchvision dataset
    xtest = torch.stack([data[0] for data in testset])
    ytest = torch.tensor([data[1] for data in testset])

    return xtest, ytest

## Function to Test Model Accuracy

In [None]:
def test_accuracy(model, xtest, ytest):
    model.eval()
    correct, total = 0, 0

    with torch.no_grad():
        for i in range(len(xtest)):
            x = xtest[i].unsqueeze(0).to(device)
            y = ytest[i].unsqueeze(0).to(device)

            outputs = model(x)
            _, predicted = torch.max(outputs.data, 1)

            total += y.size(0)
            correct += (predicted==y).sum().item()

    accuracy = (correct / total) * 100
    print(f'\nAccuracy of the testset is: {accuracy:.3f}%\n')

## Loading and Initializing Different Types of Models

In [None]:
#%cd /kaggle/working/adversarial-distance-estimation
import models.wideresnet as wideresnet
from robustbench.utils import load_model

modeltype = 'adversarial'

print(f'\nLoading {modeltype} Model...\n')
if modeltype == 'standard':
    net = wideresnet.WideResNet_28_4(10, 'CIFAR10', normalized=True, block=wideresnet.WideBasic, activation_function='relu')
    state_dict = "model_state_dict"
    net = torch.nn.DataParallel(net)
    PATH = f'./models/pretrained_models/{modeltype}.pth'
    model = torch.load(PATH)
    net.load_state_dict(model[state_dict], strict=False)
elif modeltype == 'robust':
    #self trained with massive random data augmentation and JSD consistency loss, but no adversarial objective
    net = wideresnet.WideResNet_28_4(10, 'CIFAR10', normalized=True, block=wideresnet.WideBasic, activation_function='silu')
    net = torch.nn.DataParallel(net)
    state_dict = "model_state_dict"
    PATH = f'./models/pretrained_models/{modeltype}.pth'
    model = torch.load(PATH)
    net.load_state_dict(model[state_dict], strict=False)
elif modeltype == 'adversarial':
    #from https://github.com/BorealisAI/mma_training/tree/master/trained_models/cifar10-Linf-MMA-20-sd0
    model_name = 'Ding2020MMA'
    net = load_model(model_name=model_name, dataset='cifar10', threat_model='Linf')
    net = torch.nn.DataParallel(net)

%cd
net.eval()

In [None]:
#criterion and optimizer do not matter for the evaluation-only in this notebook
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.01)

classifier = PyTorchClassifier(model=net,
                               loss=criterion,
                               optimizer=optimizer,
                               input_shape=(3, 32, 32),
                               nb_classes=10,
                              device_type=device)

## Importing Evasion Attack Methods from the ART Library

In [None]:
from art.attacks.evasion import (FastGradientMethod,
                                 ProjectedGradientDescentNumpy,
                                 AutoAttack,
                                 AutoProjectedGradientDescent,
                                 AutoConjugateGradient,
                                 CarliniLInfMethod,
                                 CarliniL2Method,
                                 NewtonFool,
                                 DeepFool,
                                 ElasticNet,
                                 FrameSaliencyAttack,
                                 HopSkipJump,
                                 BasicIterativeMethod)

## Adversarial Attack Initialization Class

In [None]:
class AdversarialAttacks:
  def __init__(self, classifier, epsilon, eps_iter, norm, iterations, second_attack_iters):
    self.classifier = classifier
    self.epsilon = epsilon
    self.eps_iter = eps_iter
    self.norm = norm
    self.iterations = iterations
    self.second_attack_iters = second_attack_iters

  def init_attacker(self, attack_type, **kwargs):
    if attack_type=='fast_gradient_method':
        return FastGradientMethod(self.classifier,
                                eps=self.epsilon,
                                eps_step=self.eps_iter,
                                minimal=True,
                                norm=self.norm,
                                **kwargs)
    elif attack_type=='projected_gradient_descent':
        return ProjectedGradientDescentNumpy(self.classifier,
                                             eps=self.epsilon,
                                             eps_step=self.eps_iter,
                                             max_iter=self.iterations,
                                             norm=self.norm,
                                             **kwargs)
    elif attack_type=='auto_attack':
        return AutoAttack(estimator=self.classifier,
                        eps=self.epsilon,
                        eps_step=self.eps_iter,
                        norm=self.norm)
    elif attack_type=='auto_projected_gradient_descent':
        return AutoProjectedGradientDescent(estimator=self.classifier,
                                          eps=self.epsilon,
                                          eps_step=self.eps_iter,
                                          norm=self.norm,
                                          max_iter=self.iterations,
                                          **kwargs)
    elif attack_type=='auto_conjugate_gradient':
        return AutoConjugateGradient(estimator=self.classifier,
                                   eps=self.epsilon,
                                   eps_step=self.eps_iter,
                                   norm=self.norm,
                                   max_iter=self.iterations,
                                   **kwargs)
    elif attack_type=='carlini_wagner_linf':
        return CarliniLInfMethod(self.classifier,
                               max_iter=self.second_attack_iters,
                               **kwargs)
    elif attack_type=='carlini_wagner_l2':
        return CarliniL2Method(self.classifier,
                               max_iter=self.second_attack_iters,
                               **kwargs)
    elif attack_type=='newton_fool':
        return NewtonFool(self.classifier,
                        max_iter=self.iterations,
                        **kwargs)
    elif attack_type=='deep_fool':
        return DeepFool(self.classifier,
                      max_iter=self.iterations,
                      epsilon=self.eps_iter,
                      **kwargs)
    elif attack_type=='elastic_net':
        return ElasticNet(self.classifier,
                      max_iter=self.second_attack_iters)
    elif attack_type=='frame_saliency':
        attacker = BasicIterativeMethod(self.classifier,
                                                 eps=self.epsilon,
                                                 eps_step=self.eps_iter,
                                                 max_iter=self.iterations,
                                      )
        return FrameSaliencyAttack(self.classifier,
                                 attacker,
                                 method='iterative_saliency')
    elif attack_type=='hop_skip_jump':
        return HopSkipJump(self.classifier,
                         norm=self.norm,
                         max_iter=self.second_attack_iters)
    else:
        raise ValueError(f'Attack type "{attack_type}" not supported!')

## Plug-in Function for Adversarial Attack with Early Stopping Function

In [None]:
def attack_with_early_stopping(classifier, x, y, max_iterations, attacker):
    label_flipped = False
    count = 0
    start_time = time.time()

    x = x.unsqueeze(0)

    outputs = classifier.predict(x.cpu().numpy())
    _, clean_predicted = torch.max(torch.tensor(outputs).to(device).data, 1)

    if int(clean_predicted.item()) != int(y.item()):
        print('Misclassified input. Not attacking.')
        end_time = time.time()
        return x.cpu().detach().numpy(), end_time - start_time, 0

    for j in range(max_iterations):
        adv_inputs = attacker.generate(x.cpu().detach().numpy(), y.cpu().detach().numpy())

        adv_inputs_tensor = torch.from_numpy(adv_inputs).to(device)
        outputs = classifier.predict(adv_inputs)
        _, predicted = torch.max(torch.tensor(outputs).to(device).data, 1)

        label_flipped = bool(predicted.item() != int(y.item()))

        if label_flipped:
            print(f'\tIterations for successful iterative attack: {j+1}')
            break
            
        x = adv_inputs_tensor.clone()

    end_time = time.time()
    return adv_inputs, end_time - start_time, j

## CLEVER Score Calculation Function

In [None]:
def clever_score_calculation(classifier, xtest, max_epsilon, nb_batch, batch_size, norm):
  # Calculate CLEVER score
  torch.cuda.empty_cache()

  # Convert the reshaped tensor to a numpy array
  xtest_np = xtest.cpu().numpy()

  # Initialize lists to store CLEVER scores and corresponding image IDs
  images_id, clever_scores, runtimes = [], [], []

  # Iterate through each image for CLEVER score calculation
  for image in range(len(xtest)):
    start_time = time.time()
    # Calculate CLEVER score using the provided classifier and parameters
    clever_score = clever_u(classifier,
                              x=xtest_np[image],
                              nb_batches=nb_batch,
                              batch_size=batch_size,
                              radius=max_epsilon,
                              norm=norm,
                              pool_factor=3)

    end_time = time.time()
    elapsed_time = end_time - start_time

    # Append the calculated CLEVER score to the list
    clever_scores.append(clever_score)

    # Append the image ID to the list
    images_id.append(image)

    # Append runtime for each image
    runtimes.append(elapsed_time)

    # Print the calculated CLEVER score for the current image
    print(f"Image: {image}, Score: {clever_score}, Runtime: {elapsed_time} sec")

  results_dict = {
      'images_id': images_id,
      'clever_score': clever_scores,
      'runtime': runtimes}
  print(f'\nTotal runtime for {len(xtest)} images is {np.sum(results_dict["runtime"])} seconds\n')
  return results_dict

## Combined Adversarial Distance and CLEVER Score Calculation Function

In [None]:
def combined_adv_dist_clever_score(classifier, xtest, ytest, epsilon, eps_iter, norm, max_iterations, clever_configs: list, get_image: bool = False, verbose: bool = False):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    classifier.model.to(device)
    xtest = xtest.to(device)
    ytest = ytest.to(device)

    minimum_adversarial_distance = []
    results_dict = {f'{norm}': {
            'clever_score': {},
            'adversarial_distance_pgd': [],
            'iterations_pgd': [],
            'adversarial_distance_second_attack': [],
            'indices': [],
            'max_adversarial_distance': 0.0
        }
    }

    attacks = AdversarialAttacks(classifier=classifier,
                                 epsilon=epsilon,
                                 eps_iter=eps_iter,
                                 norm=norm,
                                 iterations=1,
                                 second_attack_iters=100)

    iterative_attack_type = 'projected_gradient_descent'
    attacker1 = attacks.init_attacker(iterative_attack_type, verbose=verbose)

    if norm == 1:
        second_attack_type = 'elastic_net'
    elif norm == 2:
        second_attack_type = 'carlini_wagner_l2'
    else:
        second_attack_type = 'hop_skip_jump'
    attacker_2 = attacks.init_attacker(second_attack_type)

    correct_prediction_1, correct_prediction_2 = 0, 0

    for i, x in enumerate(xtest):
        x = x.to(device)
        y = ytest[i].unsqueeze(0).to(device)

        # First Attack
        print(f'\nAttacking image {i}\n\t-with {iterative_attack_type}')
        x_adversarial_1, runtime_1, j = attack_with_early_stopping(classifier=classifier,
                                                                x=x,
                                                                y=y,
                                                                max_iterations=max_iterations,
                                                                attacker=attacker1)

        x_adversarial_tensor = torch.from_numpy(x_adversarial_1).to(device)

        # Adversarial accuracy calculation
        output_adversarial_1 = classifier.predict(x_adversarial_1)
        _, predicted_adversarial_1 = torch.max(torch.tensor(output_adversarial_1).to(device).data, 1)
        correct_prediction_1 += (predicted_adversarial_1.item() == int(y.item()))

        distance_1 = torch.norm((x - x_adversarial_tensor), p=float(norm))
        results_dict[f'{norm}']['adversarial_distance_pgd'].append(distance_1.item())
        results_dict[f'{norm}']['iterations_pgd'].append(j)

        # Second Attack
        x = x.unsqueeze(0)
        outputs = classifier.predict(x.cpu().numpy())
        _, clean_predicted = torch.max(torch.tensor(outputs).to(device).data, 1)

        if int(clean_predicted.item()) == int(y.item()):
            
            print(f'\t-with {second_attack_type}\n')
            time1 = time.time()
            x_adversarial_2 = attacker_2.generate(x=x.cpu().numpy(),
                                                  y=np.expand_dims(y.cpu().numpy(), axis=0))
            time2 = time.time()
            runtime_2 = time2 - time1

            x_adversarial_tensor = torch.tensor(x_adversarial_2).to(device)
        
        else:
            x_adversarial_2 = x.cpu().numpy()
            runtime_2 = 0.0
            x_adversarial_tensor = x.to(device)

        # Adversarial accuracy calculation
        output_adversarial_2 = classifier.predict(x_adversarial_2)
        _, predicted_adversarial_2 = torch.max(torch.tensor(output_adversarial_2).to(device).data, 1)
        correct_prediction_2 += (predicted_adversarial_2.item() == int(y.item()))

        distance_2 = torch.norm((x - x_adversarial_tensor), p=float(norm))
        results_dict[f'{norm}']['adversarial_distance_second_attack'].append(distance_2.item())
        
        print(f'distance1: {distance_1.item():.4f}\tdistance2: {distance_2.item():.4f}\timage index: {i}')
        results_dict[f'{norm}']['indices'].append(i)
        
        if j<(max_iterations - 1):
            if distance_2>0.0:
                minimum_adversarial_distance.append(min(distance_1.item(), distance_2.item()))
            else:
                minimum_adversarial_distance.append(distance_1.item())
        else:
            if distance_2>0.0:
                minimum_adversarial_distance.append(distance_2.item())
            

    adversarial_accuracy_1 = (correct_prediction_1 / len(xtest)) * 100
    adversarial_accuracy_2 = (correct_prediction_2 / len(xtest)) * 100
    print(f'Adversarial accuracy of {adversarial_accuracy_1}  (iterative attack) {adversarial_accuracy_2} (second attack).'
          'If the lower values is not close to 0, increase attack strength for accurate adversarial distance estimation!')
    
    # Clever Score Calculation
    max_adv_dist = max(minimum_adversarial_distance)
    results_dict[f'{norm}']['max_adversarial_distance'] = max_adv_dist
    print(f'\nmin list: {minimum_adversarial_distance}\n')
    print(f'\nClever calculation will be done with maximum adversarial distance: {max_adv_dist}\n')

    clever_configs_results = {}
    for nb_batch, batch_size in clever_configs:
        print(f'Config: [{nb_batch}, {batch_size}]')
        results_dict_clever = clever_score_calculation(classifier=classifier,
                                                       xtest=xtest,
                                                       max_epsilon=max_adv_dist,
                                                       nb_batch=nb_batch,
                                                       batch_size=batch_size,
                                                       norm=norm)
        clever_configs_results[f'{nb_batch}-{batch_size}'] = results_dict_clever

        results_dict[f'{norm}']['clever_score'][f'{nb_batch}-{batch_size}'] = np.array(results_dict_clever['clever_score']).tolist()

    return results_dict, max_adv_dist

# Experiments

## Parameters

In [None]:
norm = 2
max_iterations = 500
eps_iter_dict = {
    'inf': 0.0003,
    '1': 0.2,
    '2': 0.005}
eps_iter = eps_iter_dict[str(norm)]
epsilon = max_iterations * eps_iter

clever_configs = [(5, 5), 
                  (10, 20),
                  (50, 100), 
                  (500, 1024)]

## Load the dataset

In [None]:
splitsize = 500        # full, int: splitsize
xtest, ytest = load_dataset(dataset_split=splitsize)
xtest, ytest = xtest.to(device), ytest.to(device)

## Test Accuracy

In [None]:
test_accuracy(model=net,
             xtest=xtest,
             ytest=ytest)

## Adversarial Distance and Clever Score Calculation

In [None]:
results_dict, max_adversarial_distance = combined_adv_dist_clever_score(classifier,
                                                   xtest=xtest,
                                                   ytest=ytest,
                                                   epsilon=epsilon,
                                                   eps_iter=eps_iter,
                                                   norm=norm,
                                                   max_iterations=max_iterations,
                                                    clever_configs=clever_configs)

## Processing Adversarial Distances and Calculating Minimum Attack Values

In [None]:
nb_batch, batch_size = clever_configs[-1][0], clever_configs[-1][1]
clever_values = results_dict[f'{norm}']['clever_score'][f'{nb_batch}-{batch_size}']

In [None]:
min_attack_value, colors_attack, colors_clever = [], [], []

for i in range(len(results_dict[f"{norm}"]['adversarial_distance_pgd'])):
    
    #Misclassified inputs
    if results_dict[f"{norm}"]['adversarial_distance_pgd'][i]==0.0:
        colors_attack.append('blue')
        colors_clever.append('black')
        clever_values[i] = None
        min_attack_value.append(0.0)
        
    # Both attacks unsuccessful
    elif modeltype=='adversarial' and results_dict[f"{norm}"]['iterations_pgd'][i] == (max_iterations - 1) and results_dict[f"{norm}"]['adversarial_distance_second_attack'][i] == 0.0:
        colors_attack.append('blue')
        colors_clever.append('black')
        min_attack_value.append(max_adversarial_distance)
    
    # CW is None but PGD works
    elif modeltype=='adversarial' and results_dict[f"{norm}"]['iterations_pgd'][i] < (max_iterations - 1) and results_dict[f"{norm}"]['adversarial_distance_second_attack'][i] == 0.0:
        colors_attack.append('blue')
        min_attack_value.append(results_dict[f"{norm}"]['adversarial_distance_pgd'][i])
        if results_dict[f'{norm}']['clever_score'][f'{nb_batch}-{batch_size}'][i]>results_dict[f"{norm}"]['adversarial_distance_pgd'][i]:
            colors_clever.append('red')
        else:
            colors_clever.append('black')
    
    # PGD is None but CW works
    elif results_dict[f"{norm}"]['iterations_pgd'][i] == (max_iterations - 1) and results_dict[f"{norm}"]['adversarial_distance_second_attack'][i] > 0.0:
        colors_attack.append('green')
        min_attack_value.append(results_dict[f"{norm}"]['adversarial_distance_second_attack'][i])
        if results_dict[f'{norm}']['clever_score'][f'{nb_batch}-{batch_size}'][i]>results_dict[f"{norm}"]['adversarial_distance_second_attack'][i]:
            colors_clever.append('red')
        else:
            colors_clever.append('black')
        
    # take min of two
    else:
        if results_dict[f"{norm}"]['adversarial_distance_second_attack'][i]<results_dict[f"{norm}"]['adversarial_distance_pgd'][i]:
            colors_attack.append('green')
            min_attack_value.append(results_dict[f"{norm}"]['adversarial_distance_second_attack'][i])
        else:    
            colors_attack.append('blue')
            min_attack_value.append(results_dict[f"{norm}"]['adversarial_distance_pgd'][i])
        
        if min_attack_value[i]>=results_dict[f'{norm}']['clever_score'][f'{nb_batch}-{batch_size}'][i]:
            colors_clever.append('black')
        else:
            colors_clever.append('red')

## Save the data as JSON

In [None]:
sorted_indices = np.argsort(min_attack_value)
min_attack_value_sorted = np.array(min_attack_value)[sorted_indices].tolist()
clever_values_sorted = np.array(clever_values)[sorted_indices].tolist()
colors_attack_sorted = np.array(colors_attack)[sorted_indices].tolist()
colors_clever_sorted = np.array(colors_clever)[sorted_indices].tolist()

In [None]:
import json

json_file_path = f'/kaggle/working/adversarial-distance-estimation/data/adv_dist_vs_clever_{modeltype}_{norm}.json'

with open(json_file_path, 'w') as f:
    json.dump(results_dict, f, sort_keys=True)
print(f'Evaluation results are saved under "{json_file_path}".')

# Plotting

In [None]:
import matplotlib.pyplot as plt
import matplotlib.lines as mlines

# Calculate the error proportion
total_points = 0
red_points = 0

image_ids = np.arange(len(min_attack_value))

for i in range(len(image_ids)):
    if clever_values_sorted[i] is not None:
        total_points += 1
        if clever_values_sorted[i] > min_attack_value_sorted[i]:
            red_points += 1

error_proportion = (red_points / total_points) * 100 if total_points > 0 else 0

mean_clever_score = np.mean([x for x in clever_values_sorted if x is not None and np.isnan(x)==False])
mean_min_attack_value = np.mean([x for x in min_attack_value_sorted if np.isnan(x)==False])

# Plotting
plt.figure(figsize=(20, 8))
plt.scatter(image_ids, min_attack_value_sorted, color=colors_attack_sorted)
plt.scatter(image_ids, clever_values_sorted, color=colors_clever_sorted)
# Adding labels and title with error proportion
plt.xlabel('Image ID')
plt.ylabel(f'L{norm} Distance')
plt.title(f'Error Proportion: {error_proportion:.2f}%   Mean Clever Score: {mean_clever_score:.4f}   Mean Adversarial Distance: {mean_min_attack_value:.4f}', fontsize=14)
plt.xticks(np.arange(0, len(sorted_indices), 10))
# Avoid label duplication in the legend
handles, labels = plt.gca().get_legend_handles_labels()
by_label = dict(zip(labels, handles))

legend_elements = [
    mlines.Line2D([0], [0], marker='o', color='w', markerfacecolor='blue', label='Adversarial Distance (PGD)'),
    mlines.Line2D([0], [0], marker='o', color='w', markerfacecolor='green', label='Adversarial Distance (Second attack)'),
    mlines.Line2D([0], [0], marker='o', color='w', markerfacecolor='red', label='Clever Score $\geq$ Adversarial DWistance'),
    mlines.Line2D([0], [0], marker='o', color='w', markerfacecolor='black', label='Clever Score $<$ Adversarial Distance')
]

plt.legend(handles=legend_elements, fontsize=10)
plt.show()