<a href="https://colab.research.google.com/github/AristiPap/Thesis_Stuff/blob/main/BoundaryAttack_Hybrid_Diffe_Bounds.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import timeit
from concurrent import futures
from concurrent.futures import ThreadPoolExecutor

from __future__ import print_function
try:
	raw_input
except:
	raw_input = input

from tensorflow.keras import layers
from tensorflow import keras
import tensorflow as tf

import tensorflow_datasets as tfds

tfds.disable_progress_bar()

import matplotlib.pyplot as plt
import numpy as np



import numpy as np
import time
import os
from PIL import Image
import sys
from keras.preprocessing import image
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions

RESNET_MEAN = np.array([103.939, 116.779, 123.68])

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def sphere_perturbation(delta, prev_sample, target_sample):
    """Generate orthogonal perturbation."""
    perturb = np.random.randn(1, 224, 224, 3)
    perturb /= np.linalg.norm(perturb, axis=(1, 2))
    perturb *= delta * np.mean(get_diff(target_sample, prev_sample))
    # Project perturbation onto sphere around target
    # Orthorgonal vector to sphere surface
    diff = (target_sample - prev_sample).astype(np.float32)
    diff /= get_diff(target_sample, prev_sample)  # Orthogonal unit vector
    # We project onto the orthogonal then subtract from perturb
    # to get projection onto sphere surface
    perturb -= (np.vdot(perturb, diff) / np.linalg.norm(diff)**2) * diff
    # Check overflow and underflow
    overflow = (prev_sample + perturb) - 255 + RESNET_MEAN
    perturb -= overflow * (overflow > 0)
    underflow = -RESNET_MEAN
    perturb += underflow * (underflow > 0)
    return perturb

In [None]:
def forward_perturbation(epsilon, prev_sample, target_sample):
    """Generate forward perturbation."""
    perturb = (target_sample - prev_sample).astype(np.float32)
    perturb /= get_diff(target_sample, prev_sample)
    perturb *= epsilon
    return perturb


def get_converted_prediction(sample, classifier):
    """
    The loss of precision often causes the label of the image to change, particularly
    because we are very close to the boundary of the two classes.
    This function checks for the label of the exported sample
    by simulating the export process.
    """
    #sample = (sample + RESNET_MEAN).astype(np.uint8).astype(np.float32) - RESNET_MEAN
    label = decode_predictions(classifier.predict(sample), top=1)[0][0][1]
    return label


def save_image(sample, classifier, folder):
    """Export image file."""
    label = get_converted_prediction(np.copy(sample), classifier)
    print("Labeling from function",label)
    sample = sample.reshape(224, 224, 3)
    # Reverse preprocessing, see https://github.com/keras-team/keras/blob/master/keras/applications/imagenet_utils.py
    mean = [103.939, 116.779, 123.68]
    sample[..., 0] += mean[0]
    sample[..., 1] += mean[1]
    sample[..., 2] += mean[2]
    sample = sample[..., ::-1].astype(np.uint8)
    # Convert array to image and save
    sample = Image.fromarray(sample)
    id_no = time.strftime('%Y%m%d_%H%M%S', datetime.datetime.now().timetuple())
    # Save with predicted label for image (may not be adversarial due to uint8 conversion)
    sample.save(os.path.join("/content/drive/MyDrive/Thesis_notebooks/my_boundary_attack/boundary-attack-master/images", folder,"{}_{}.png".format(id_no, label)))

#preprocessing with keras
def preprocess(sample_path):
    """Load and preprocess image file."""
    img = image.load_img(sample_path, target_size=(224, 224))
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = preprocess_input(x)
    return x

#keep mean error for each iteration
def get_diff(sample_1, sample_2):
    """Channel-wise norm of difference between samples."""
    sample_1 = sample_1.reshape(3, 224, 224)
    sample_2 = sample_2.reshape(3, 224, 224)
    diff = []
    for i, channel in enumerate(sample_1):
      diff.append(np.linalg.norm((channel - sample_2[i]).astype(np.float32)))
    return np.array(diff)
    #return np.linalg.norm(sample_1 - sample_2, axis=(1, 2))

In [None]:
class MyCNN(Sequential):
  def __init__(self,input_size,filter_size=128, kernel_size = (3,3), dropout_flg = True, dropout_rate = 0.3):
    super(MyCNN, self).__init__()
    self.input_size = input_size
    self.filter_size = filter_size
    self.kernel_size = kernel_size
    self.dropout_flg = dropout_flg
    self.dropout_rate = dropout_rate

    self.add(L.Conv2D(self.filter_size, kernel_size=self.kernel_size,padding='same', activation='relu', input_shape=self.input_size))
    if dropout_flg:
      self.add(L.Dropout(self.dropout_rate))
    self.add(L.MaxPooling2D(pool_size=(2, 2)))

    self.add(L.Conv2D(self.filter_size/2, kernel_size=self.kernel_size,padding='same', activation='relu',input_shape=self.input_size))
    if dropout_flg:
      self.add(L.Dropout(self.dropout_rate))
    self.add(L.MaxPooling2D(pool_size=(2, 2)))

    self.add(L.Conv2D(self.filter_size/2, kernel_size=self.kernel_size,padding='same', activation='relu'))
    if dropout_flg:
      self.add(L.Dropout(self.dropout_rate))
    self.add(L.MaxPooling2D(pool_size=(2, 2)))

    self.add(L.Conv2D(self.filter_size/2, kernel_size=self.kernel_size,padding='same', activation='relu'))
    self.add(L.MaxPooling2D(pool_size=(2, 2)))

    if dropout_flg:
      self.add(L.Dropout(self.dropout_rate))
      
    self.add(L.Flatten())
    self.add(L.Dense(32))
    self.add(L.Dropout(0.2))
    self.add(L.Dense(num_classes, activation='softmax'))

In [None]:
from scipy.spatial import distance

class boundary_attack():
  def __init__(self,init_sample_path,target_sample_path,dir_path):
    # load the pre-trained ResNet50 model
    print("[INFO] loading pre-trained ResNet50 model...")
    self.classifier = ResNet50(weights='imagenet')

    #init paths and folders for the statistics
    self.initial_sample = preprocess(init_sample_path)
    self.target_sample = preprocess(target_sample_path)

    self.predictions_initial_sample = decode_predictions(self.initial_sample, top=3)[0]
    self.predictions_target_sample = decode_predictions(self.target_sample, top=3)[0]

    

    self.folder = time.strftime('[%d%m%Y]_%H%M%s', time.localtime())
    os.mkdir(os.path.join(dir_path, self.folder))
    save_image(np.copy(self.initial_sample), self.classifier, self.folder)
    #classifier initialised with our picture (in this case picture of seal that you can faintly see behind the eel when the attack is executed)
    self.attack_class = np.argmax(self.classifier.predict(self.initial_sample))
    #Our target image is the eel.The who,e point is to create an image that looks almost exactly like the eel but has pertrubations "inspired by the eel", 
    #so that it's misclassified
    self.target_class = np.argmax(self.classifier.predict(self.target_sample))

    self.adversarial_sample = self.initial_sample
    self.n_steps = 0
    self.n_calls = 0
    self.epsilon = 1.
    self.delta = 0.1
    self.errors = []
    self.boundaries_found = 0
    self.repetitions = 0
    self.prev_dist = 0
    self.no_bound_found = 0

  def _attack(self):
    # Move first step to the boundary
    # get first adversarial point to begin initiating the attack 
    #this is the hybrid version, where we keep looping, till we find a closer adversarial point
    while True:
        trial_sample = self.adversarial_sample + forward_perturbation(self.epsilon, self.adversarial_sample, self.target_sample) - self.prev_dist
        prediction = self.classifier.predict(trial_sample.reshape(1, 224, 224, 3))
        self.n_calls += 1
        dist = distance.euclidean(prediction, self.classifier.predict(self.target_sample)) 
   
        if self.repetitions == 0:
          self.prev_dist = dist
        if dist < self.prev_dist:
          print("FOUND CLOSER BOUNDARY")
          print(dist)
        self.prev_dist = dist
        if self.repetitions > 1000 or self.no_bound_found > 100:
          break
        if np.argmax(prediction) == self.attack_class:
            self.adversarial_sample = trial_sample
            print("Found another bound")
            self.boundaries_found += 1
            #self.epsilon *= 0.9
        else:
          print("Didn't Find bound")
          self.epsilon *= 0.9
          self.no_bound_found += 1
          break
        self.repetitions += 1
    # Iteratively run attack
    while True:
        print("Step #{}...".format(self.n_steps))
        # Orthogonal step
        print("\tDelta step...")
        d_step = 0
        while True:
            d_step += 1
            print("\t#{}".format(d_step))
            trial_samples = []
            for i in np.arange(10):
                trial_sample = self.adversarial_sample + sphere_perturbation(self.delta, self.adversarial_sample, self.target_sample)
                trial_samples.append(trial_sample)
            
            trial_samples = np.asarray(trial_samples).reshape((-1,224,224,3))
            predictions = self.classifier.predict(trial_samples)
            self.n_calls += 10
            predictions = np.argmax(predictions, axis=1)
            d_score = np.mean(predictions == self.attack_class)
            if d_score > 0.0:
                if d_score < 0.3:
                    self.delta *= 0.9
                elif d_score > 0.7:
                    self.delta /= 0.9
                self.adversarial_sample = np.array(trial_samples)[np.where(predictions == self.attack_class)[0][0]]
                break
            else:
                self.delta *= 0.9
        # Forward step
        print("\tEpsilon step...")
        e_step = 0
        while True:
            e_step += 1
            print("\t#{}".format(e_step))
            trial_sample = self.adversarial_sample + forward_perturbation(self.epsilon, self.adversarial_sample, self.target_sample)
            prediction = self.classifier.predict(trial_sample)
            self.n_calls += 1
            #adversarial step
            if np.argmax(prediction) == self.attack_class:
                temp = get_converted_prediction(np.copy(trial_sample), self.classifier)
                print(temp)
                self.adversarial_sample = trial_sample
                self.epsilon /= 0.5
                break
            elif e_step > 500:
                break
            else:
                self.epsilon *= 0.5

        self.n_steps += 1
        # keep some checkpoints so that we save the pictures whilst the attack progresses
        chkpts = [250, 750]
        if self.n_steps % 10 == 0:# in chkpts:
            print("{} steps".format(self.n_steps))
            save_image(np.copy(self.adversarial_sample), self.classifier, self.folder)
        diff = np.mean(get_diff(self.adversarial_sample, self.target_sample))
        self.errors.append(diff)
        #stop at 1000 steps ~= 40 minutes
        if diff <= 1e-3 or e_step > 500 or self.n_steps > 7000:
            print("{} steps".format(self.n_steps))
            print("Mean Squared Error: {}".format(diff))
            save_image(np.copy(self.adversarial_sample), self.classifier, self.folder)
            break

        print("Mean Squared Error: {}".format(diff))
        print("Calls: {}".format(self.n_calls))
        print("Attack Class: {}".format(self.attack_class))
        print("Target Class: {}".format(self.target_class))
        print("Adversarial Class: {}".format(np.argmax(prediction)))


In [None]:
import datetime
init_path = '/content/drive/MyDrive/Thesis_notebooks/my_boundary_attack/boundary-attack-master/images/original/awkward_moment_seal.png'
target_path ='/content/drive/MyDrive/Thesis_notebooks/my_boundary_attack/boundary-attack-master/images/original/bad_joke_eel.png'
_dir_path = "/content/drive/MyDrive/Thesis_notebooks/my_boundary_attack/boundary-attack-master/images"
model = boundary_attack(init_path,target_path,_dir_path)

[INFO] loading pre-trained ResNet50 model...
Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels.h5


ValueError: ignored

In [None]:
model._attack()

In [None]:
import matplotlib.pyplot as plt

x_data = list(range(0, len(model.errors)))
y_data = model.errors

fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)

ax.plot(x_data, y_data)
ax.set_xlabel('steps to complete')
ax.set_ylabel('mse')

plt.show()