In [1]:
!pip install cleverhans --quiet
!pip install adversarial-robustness-toolbox --quiet
!pip install multiprocess --quiet
!pip install importlib --quiet

In [2]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
import numba
numba.__version__

import importlib
import time
import matplotlib.pyplot as plt
import numpy as np

from art.estimators.classification import PyTorchClassifier
from art.metrics import clever_u

In [3]:
from art.attacks.evasion import (FastGradientMethod,
                                 ProjectedGradientDescentPyTorch,
                                 AutoAttack,
                                 AutoProjectedGradientDescent,
                                 AutoConjugateGradient,
                                 CarliniLInfMethod,
                                 CarliniL2Method,
                                 NewtonFool,
                                 DeepFool,
                                 ElasticNet,
                                 FrameSaliencyAttack,
                                 HopSkipJump,
                                 BasicIterativeMethod)

In [4]:
def load_dataset(dataset_split: int):
  # Load CIFAR-10 dataset using torchvision
  transform = transforms.Compose([transforms.ToTensor()])
  testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

  # Truncated testset for experiments and ablations
  truncated_testset, _ = torch.utils.data.random_split(testset,
                                                      [dataset_split, len(testset) - dataset_split],
                                                      generator=torch.Generator().manual_seed(42))

  # Extract data and labels from torchvision dataset
  xtest = torch.stack([data[0] for data in truncated_testset])
  ytest = torch.tensor([data[1] for data in truncated_testset])

  return xtest, ytest

In [14]:
import models.experiments.wideresnet as wideresnet

#use for standard model
#net = wideresnet.WideResNet_28_4(10, 1, 'CIFAR10', normalized=True, block=wideresnet.WideBasic, dropout_rate=0.2, activation_function='relu')
#use for corruption robust model
net = wideresnet.WideResNet_28_4(10, 1, 'CIFAR10', normalized=True, block=wideresnet.WideBasic, dropout_rate=0.2, activation_function='silu')
net = torch.nn.DataParallel(net)
# PATH = './models/pretrained_models/standard.pth'
PATH = './models/pretrained_models/robust.pth'
state_dict = torch.load(PATH)
net.load_state_dict(state_dict["model_state_dict"], strict=False)
net.eval()

RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

Adversarial Attacks Class

In [None]:
class AdversarialAttacks:
  def __init__(self, classifier, epsilon, eps_iter, norm, iterations, second_attack_iterations):
    self.classifier = classifier
    self.epsilon = epsilon
    self.eps_iter = eps_iter
    self.norm = norm
    self.iterations = iterations
    self.second_attack_iters = second_attack_iters

  def init_attacker(self, attack_type, **kwargs):

    if attack_type=='fast_gradient_method':
        return FastGradientMethod(self.classifier,
                                eps=self.epsilon,
                                eps_step=self.eps_iter,
                                minimal=True,
                                norm=self.norm,
                                **kwargs)
    elif attack_type=='projected_gradient_descent':
        return ProjectedGradientDescentPyTorch(self.classifier,
                                             eps=self.epsilon,
                                             eps_step=self.eps_iter,
                                             max_iter=self.iterations,
                                             norm=self.norm,
                                             **kwargs)
    elif attack_type=='auto_attack':
        return AutoAttack(estimator=self.classifier,
                        eps=self.epsilon,
                        eps_step=self.eps_iter,
                        norm=self.norm)
    elif attack_type=='auto_projected_gradient_descent':
        return AutoProjectedGradientDescent(estimator=self.classifier,
                                          eps=self.epsilon,
                                          eps_step=self.eps_iter,
                                          norm=self.norm,
                                          max_iter=self.iterations,
                                          **kwargs)
    elif attack_type=='auto_conjugate_gradient':
        return AutoConjugateGradient(estimator=self.classifier,
                                   eps=self.epsilon,
                                   eps_step=self.eps_iter,
                                   norm=self.norm,
                                   max_iter=self.iterations,
                                   **kwargs)
    elif attack_type=='carlini_wagner_linf':
        return CarliniLInfMethod(self.classifier,
                               max_iter=self.second_attack_iters,
                               **kwargs)
    elif attack_type=='carlini_wagner_l2':
        return CarliniL2Method(self.classifier,
                               max_iter=self.second_attack_iters,
                               **kwargs)
    elif attack_type=='newton_fool':
        return NewtonFool(self.classifier,
                        max_iter=self.iterations,
                        **kwargs)
    elif attack_type=='deep_fool':
        return DeepFool(self.classifier,
                      max_iter=self.iterations,
                      epsilon=self.eps_iter,
                      **kwargs)
    elif attack_type=='elastic_net':
        return ElasticNet(self.classifier,
                      max_iter=self.second_attack_iters)
    elif attack_type=='frame_saliency':
        attacker = BasicIterativeMethod(self.classifier,
                                                 eps=self.epsilon,
                                                 eps_step=self.eps_iter,
                                                 max_iter=self.iterations,
                                      )
        return FrameSaliencyAttack(self.classifier,
                                 attacker,
                                 method='iterative_saliency')
    elif attack_type=='hop_skip_jump':
        return HopSkipJump(self.classifier,
                         norm=self.norm,
                         max_iter=self.second_attack_iters)
    else:
        raise ValueError(f'Attack type "{attack_type}" not supported!')

Plug-in function for attack with early stopping

In [None]:
def attack_with_early_stopping(classifier, x, y, max_iterations, attacker, verbose: bool = True):
    label_flipped = False
    count = 0
    start_time = time.time()

    x = x.unsqueeze(0).numpy()
    y = y.numpy()

    outputs = classifier.predict(x)
    _, clean_predicted = torch.max(torch.tensor(outputs).data, 1)
     
    if clean_predicted.item()!=int(y):
        print('Misclassified input. Not attacking.')
        end_time = time.time()
        return x, end_time - start_time

    for i in range(max_iterations):

        adv_inputs = attacker.generate(x, y=np.expand_dims(y, axis=0))

        outputs = classifier.predict(adv_inputs)
        _, predicted = torch.max(torch.tensor(outputs).data, 1)

        label_flipped = bool(predicted.item()!=int(y))

        if label_flipped:
            if verbose:
              print(f'\nIterations for successful attack: {i+1} with processing time: {elapsed_time:.3f} seconds\n')
            break

        x = adv_inputs.copy()
    
    end_time = time.time()
    return adv_inputs, end_time - start_time


In [None]:
#criterion and optimizer do not matter for the evaluation-only in this notebook
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), lr=0.01)

classifier = PyTorchClassifier(model=net,
                               loss=criterion,
                               optimizer=optimizer,
                               input_shape=(3, 32, 32),
                               nb_classes=10)

Clever Score Calculation

In [None]:
def clever_score_calculation(classifier, xtest, ytest, max_epsilon, nb_batch, batch_size, norm):
  # Calculate CLEVER score
  torch.cuda.empty_cache()

  # Convert the reshaped tensor to a numpy array
  xtest_np = xtest.numpy()

  # Initialize lists to store CLEVER scores and corresponding image IDs
  images_id, clever_scores, runtimes = [], [], []

  # Iterate through each image for CLEVER score calculation
  for image in range(len(xtest)):
    start_time = time.time()
    # Calculate CLEVER score using the provided classifier and parameters
    clever_score = clever_u(classifier,
                              x=xtest_np[image],
                              nb_batches=nb_batch,
                              batch_size=batch_size,
                              radius=maximum_adversarial_distance,
                              norm=norm,
                              pool_factor=3)

    end_time = time.time()
    elapsed_time = end_time - start_time

    # Append the calculated CLEVER score to the list
    clever_scores.append(clever_score)

    # Append the image ID to the list
    images_id.append(image)

    # Append runtime for each image
    runtimes.append(elapsed_time)

    # Print the calculated CLEVER score for the current image
    print(f"Image: {image}, Score: {clever_score}, Runtime: {elapsed_time} sec")

  results_dict = {
      'images_id': images_id,
      'clever_score': clever_scores,
      'runtime': runtimes}
  print(f'\nTotal runtime for {len(xtest)} images is {np.sum(results_dict["runtime"])} seconds\n')
  return results_dict

Combined Calculation

In [None]:
eps_iter_dict = {
    'inf': 0.0003,
    '1': 0.2,
    '2': 0.005}

In [None]:
norm = 2
max_iterations = 1000
eps_iter = eps_iter_dict[str(norm)]
epsilon = max_iterations * eps_iter

splitsize = 20

In [None]:
print(eps_iter)

In [None]:
xtest, ytest = load_dataset(dataset_split=splitsize)

In [None]:
clever_configs = [(5, 5),
           (10, 20),
           (50, 100),
           (500, 1024)]

In [None]:
def combined_adv_dist_clever_score(classifier, xtest, ytest, epsilon, eps_iter, norm, max_iterations, get_image: bool = False, verbose: bool = True):

    adversarial_distance_list, clever_score_list = [], []

    attacks = AdversarialAttacks(classifier=classifier,
                              epsilon=epsilon,
                              eps_iter=eps_iter,
                              norm=norm,
                              iterations=1,
                              second_attack_iterations=40)
    
    iterative_attack_type = 'projected_gradient_descent'
    attacker1 = attack_wrapper.init_attacker(iterative_attack_type)
    
    if norm==1:
        second_attack_type='elastic_net'
    elif norm==2:
        second_attack_type='carlini_wagner_l2'
    else:
        second_attack_type='hop_skip_jump'
    attacker_2 = attacks.init_attacker(second_attack_type)


    correct_prediction_1, correct_prediction_2 = 0, 0
  
    for i, x in enumerate(xtest): 
    
        """First Attack"""
        x_adversarial_1, runtime_1 = attack_with_early_stopping(classifier=classifier,
                                                            x=x,
                                                            y=ytest[i],
                                                            max_iterations=max_iterations,
                                                            attacker=attacker1,
                                                            verbose=False)

        x_tensor = torch.tensor(x)
        x_adversarial_tensor = torch.tensor(x_adversarial_1)

        # Adversarial accuracy calculation
        output_adversarial_1 = classifier.predict(x_adversarial_1)
        _, predicted_adversarial_1 = torch.max(torch.tensor(output_adversarial_1).data, 1)
        correct_prediction_1 += (predicted_adversarial_1.item() == int(ytest[i]))
        
        distance_1 = torch.norm((x_tensor - x_adversarial_tensor), p=float(norm))
        """First Attack"""

        """Second Attack"""
        #print(f'second_attack_type: {second_attack_type}')
        x_adversarial_2 = attacker_2.generate(x=x.unsqueeze(0).numpy(),
                                           y=np.expand_dims(ytest[i].numpy(), axis=0))

        x_tensor = torch.tensor(x)
        x_adversarial_tensor = torch.tensor(x_adversarial_2)

        # Adversarial accuracy calculation
        output_adversarial_2 = classifier.predict(x_adversarial_2)
        _, predicted_adversarial_2 = torch.max(torch.tensor(output_adversarial_2).data, 1)
        correct_prediction_2 += (predicted_adversarial_2.item() == int(ytest[i]))

        distance_2 = torch.norm((x_tensor - x_adversarial_tensor), p=float(norm))
        """Second Attack"""

        
        """Distance comparison"""
        print(f'\n{attack_type}: {distance_1}\n{second_attack_type}: {distance_2}\n')
        if distance_1 > distance_2:
          print(f'\n{second_attack_type} attacks better than {attack_type} for norm {norm}\n')
          adversarial_distance_list.append(distance_2.item())
        else:
          adversarial_distance_list.append(distance_1.item())
        """Distance comparison"""

        if verbose:
          print(f'Image {i}\t\tAttack: {attack_type}\t\tAdversarial_distance: {distance_1:.5f}\t\tRuntime: {runtime_1:5f} seconds')

        
    adversarial_accuracy_1 = (correct_prediction_1 / len(xtest)) * 100
    adversarial_accuracy_2 = (correct_prediction_2 / len(xtest)) * 100
    print(f'Adversarial accuracy of '{adversarial_accuracy_1} ' / '{adversarial_accuracy_2} ' for iterative / second attacks.' 
          'If the lower values is not close to 0, increase attack strength for accurate adversarial distance estimation!')
        
        
    """Clever Score Calculation"""
    max_adv_dist = max(adversarial_distance_list)
    #max_adv_dist = 0.3870657980442047
    print(f'\nClever calculation will be done with maximum adversarial distance: {max_adv_dist}\n')

    clever_configs_results = {}
    for nb_batch, batch_size in clever_configs:
    print(f'Config: [{nb_batch}, {batch_size}]')
    results_dict_clever = clever_score_calculation(classifier=classifier,
                                                    xtest=xtest,
                                                    ytest=ytest,
                                                    max_epsilon=max_adv_dist,
                                                    nb_batch=nb_batch,
                                                    batch_size=batch_size,
                                                 norm=norm)
    clever_configs_results[f'{nb_batch}-{batch_size}'] = results_dict_clever
    """Clever Score Calculation"""

    """
    TBD: adding code sorted clever score list wrt adv dist 
    """
    
    return adversarial_distance_list, clever_configs_results

In [None]:
adv_dist, clever_sc = combined_adv_dist_clever_score(classifier,
                                                   xtest=xtest,
                                                   ytest=ytest,
                                                   epsilon=epsilon,
                                                   eps_iter=eps_iter,
                                                   norm=norm,
                                                   max_iterations=max_iterations)

In [None]:
print(clever_sc)

In [None]:
print(adv_dist)

In [None]:
import json

json_file = f'/kaggle/working/clever_data_robust_L{norm}.json'

with open(json_file, 'w') as f:
    json.dump(clever_sc, f, sort_keys=True)

In [None]:
sorted_indices = np.argsort(adv_dist)
clever_score_sorted = np.array(clever_sc['500-1024']['clever_score'])[sorted_indices]
adv_distance_sorted = np.array(adv_dist)[sorted_indices]

plt.figure(figsize=(10, 5))
plt.scatter(clever_sc['500-1024']['images_id'], clever_score_sorted, alpha=0.5, label='Clever Score')
plt.scatter(clever_sc['500-1024']['images_id'], adv_distance_sorted, label='Adversarial Distance')
plt.legend()
plt.xticks(clever_sc['500-1024']['images_id'])
plt.tight_layout()