In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import keras
from keras.models import Sequential
from keras.preprocessing.image import ImageDataGenerator
from keras.layers import Dense, Flatten, BatchNormalization, Dropout
from tensorflow.keras.applications import InceptionV3
from glob import glob
from imageio import imwrite

In [None]:
from tqdm.notebook import tqdm

In [None]:
data_dir = "/kaggle/input/dataset/dataset_dr/dataset_dr"
batch_size =32
train_datagen = ImageDataGenerator(rescale=1./255,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    validation_split=0.2) # set validation split

train_generator = train_datagen.flow_from_directory(
    data_dir,
    target_size=(227, 227),
    batch_size=batch_size,
    seed=1,
    class_mode='categorical',
    subset='training') # set as training data

validation_generator = train_datagen.flow_from_directory(
    data_dir, # same directory as training data
    target_size=(227, 227),
    batch_size=batch_size,
    seed=1,
    class_mode='categorical',
    subset='validation') # 

In [None]:
model = keras.models.load_model('/kaggle/input/ml-cybersec/model.h5')

In [None]:
model.evaluate(validation_generator)

In [None]:
# To retrieve full dataset from the Image Generators
validation_generator.reset()
x_test, y_test = next(validation_generator)
for i in range(int(len(validation_generator))-1):
    image, label = next(validation_generator)
    x_test = np.append(x_test, image, axis=0)
    y_test = np.append(y_test, label, axis=0)
print(x_test.shape, y_test.shape)

# FGSM Untargeted

In [None]:
def fgsm_untargeted(image, label):
    image = tf.cast(image, tf.float32)
    label = np.expand_dims(label, axis = 0)
#     print(label.shape)
    with tf.GradientTape() as tape:
        tape.watch(image)
        prediction = model(image)
        loss = tf.keras.losses.CategoricalCrossentropy()(label, prediction)
    # calculate gradident for loss function with repsect to image
    gradient = tape.gradient(loss, image)
    # only take the sign of each loss gradient
    signed_grad = tf.sign(gradient)
    
    return signed_grad

In [None]:
def peek_pert(i = 0, untargted = 1):
    # use ith image from test dataset to generate a perturbation pattern
    image = x_test[i]
    label = y_test[i]

    if untargted:
        perturbations = fgsm_untargeted(image.reshape((1, 227, 227, 3)), label)
    else:
        perturbations = fgsm_targeted(image.reshape((1, 227, 227, 3)), label)

    # show perturbation pattern
    plt.figure()
    plt.imshow(perturbations.numpy().reshape(227,227,3))
    plt.title('Perturbation Pattern')
    plt.savefig('fgsm_perturbation_pattern.png')
    plt.show()
    

In [None]:
def peek_adv(i = 0, e = 0.1, untargeted = 1):
    # FGSM adversarial attack on the sample image
    # ith image in test set, 0<=i<10000
    # e: epsilon
    image = x_test[i]
    label = y_test[i]
    if untargeted:
        perturbations = fgsm_untargeted(image.reshape((1, 227, 227, 3)), label).numpy()
        adversarial = image + perturbations * e
    else: 
        perturbations = fgsm_targeted(image.reshape((1, 227, 227, 3)), label).numpy()
        adversarial = image - perturbations * e

    print(f'Actual label: ', tf.keras.backend.argmax(model(image.reshape((1, 227, 227, 3)))).numpy()[0])
    print(f'Prediction: ', tf.keras.backend.argmax(model(adversarial)).numpy()[0])

    plt.imshow(adversarial.reshape((227, 227, 3)))
    plt.savefig('fgsm_perturbed_image.png')
    plt.show()
    

In [None]:
# peek_pert(i = 200, untargted = 1)

In [None]:
# peek_adv(i = 200, e = 0.1, untargeted = 1)

In [None]:
def test(epsilon, untargeted = 1):
    miss, hit = 0, 0
    
    for i in range(731):
        image = x_test[i]
        label = y_test[i]

        if untargeted:
            perturbations = fgsm_untargeted(image.reshape((1, 227, 227, 3)), label).numpy()
            adversarial = image + perturbations * epsilon
        else:
            perturbations = fgsm_targeted(image.reshape((1, 227, 227, 3)), label).numpy()
            adversarial = image - perturbations * epsilon

        truth = tf.keras.backend.argmax(model(image.reshape((1, 227, 227, 3)))).numpy()[0]
        pred = tf.keras.backend.argmax(model(adversarial)).numpy()[0]

        if truth != pred:
            miss += 1
        else:
            hit += 1

    print(f'Epsilon: {epsilon}, test accuracy = {hit} / 10000 = {(hit/731)*100}%')
    return (hit/731)*100

In [None]:
epsilons = [1/255, 5/255, 10/255, 15/255, 20/255]
accuracies_untargeted = []
for eps in epsilons:
    acc = test(eps, untargeted = 1)
    accuracies_untargeted.append(acc)

In [None]:
# # Original test accuracy
# model.evaluate(x_test, y_test)

In [None]:
accuracies_untargeted

In [None]:
epsilons_plot = [0] + epsilons
epsilons_plot

In [None]:
accuracies_untargeted_plots = [83.33] + accuracies_untargeted
accuracies_untargeted_plots

In [None]:
import seaborn as sns
sns.set_style('darkgrid') 
plt.rc('axes', titlesize=18)     # fontsize of the axes title
plt.rc('axes', labelsize=14)    # fontsize of the x and y labels
plt.rc('xtick', labelsize=13)    # fontsize of the tick labels
plt.rc('ytick', labelsize=13)    # fontsize of the tick labels
plt.rc('legend', fontsize=13)    # legend fontsize
plt.rc('font', size=13)          # controls default text sizes

plt.figure(figsize=(5,5))
plt.plot(epsilons_plot, accuracies_untargeted_plots, "+-")
plt.title("Accuracy vs Epsilon")
plt.xlabel("Epsilon")
plt.ylabel("Accuracy")
plt.savefig('fgsm_accuracy_with_epsilon_plot.png')
plt.show()

# Iterative FGS (BIM)

In [None]:
figure = plt.figure(figsize=(10,8))
cols, rows = 3,3
for i in range(1, cols*rows+1):
    index = np.random.randint(x_test.shape[0], size=1)
    img, label = (x_test[index], y_test[index])
    figure.add_subplot(rows, cols, i)
    plt.title("true label: {}".format(label))
    plt.axis("off")
    plt.imshow(img.squeeze(), cmap="gray")
plt.show()

In [None]:
def generate_perturb_untarget_im_iter(iteration, model, X, Y, epislon):
    x_perturb = [] 
    x_tensor = tf.convert_to_tensor(X, dtype=tf.float32)
    for i in range(iteration):
        with tf.GradientTape() as tape:
            tape.watch(x_tensor)
            pred = model(x_tensor)
            loss = tf.keras.losses.CategoricalCrossentropy()(Y, pred)
        grad = tape.gradient(loss, x_tensor)
        grad_sign = tf.sign(grad)
        x_tensor = tf.clip_by_value((x_tensor + epislon * grad_sign), clip_value_min = 0, clip_value_max = 1)
    x_perturb.append(x_tensor)
    return x_perturb

In [None]:
x_perturb = generate_perturb_untarget_im_iter(35, model, x_test[:100], y_test[:100], 1/255)

In [None]:
# # Original test accuracy
model.evaluate(x_test, y_test)

In [None]:
# # Test accuracy on perturbed images
model.evaluate(x_perturb, y_test[:100])

In [None]:
figure = plt.figure(figsize=(10,8))
# index = np.random.randint(x_test[:100].shape[0], size=1)
index = [76]
img1, label1 = (x_test[index], y_test[index])
figure.add_subplot(1, 2, 1)
plt.title("Original Image")
plt.axis("off")
plt.imshow(img1.squeeze(), cmap="gray")
img2, label2 = (np.array(x_perturb[0][index[0]]), y_test[index])
figure.add_subplot(1, 2, 2)
plt.title("Perturbed Image")
plt.axis("off")
plt.imshow(img2.squeeze(), cmap="gray")
plt.savefig('BIM_original_with_perturbed_iterations35.png')
plt.show()

# PGD

In [None]:
iterations = 20
alpha = 2
epsilon = 1/255

In [None]:
len(y_test)

In [None]:
# imwrite('/kaggle/working/'+"1002"+'_gen_img.png',np.clip(np.expand_dims(x_test[0], axis=0), 0, 255).astype('uint8'))
# print(x_test[0].shape)

In [None]:
# #pgd_generation
image_count = 0
perturbed_images_pgd = []
# flag = 0
orig_label = []
for i in range(len(y_test[:15])):
    print(i)
    gen_img = tf.identity(x_test[i])
    gen_img = gen_img + tf.random.uniform(gen_img.get_shape().as_list(), minval=-epsilon, maxval=epsilon, dtype=tf.dtypes.float32)
#     x_temp = x_test[i].numpy()
    for iters in range(iterations):
        imgv = tf.Variable(gen_img)
        imgv = tf.reshape(imgv, [1,227, 227, 3])
        with tf.GradientTape() as tape:
            tape.watch(imgv)
            predictions = model(imgv)
#             print(type(y_test[i]), type(predictions))
            y_test_tf = tf.convert_to_tensor(y_test[i], np.float32)
            y_test_tf = tf.expand_dims(y_test_tf, axis=0)
            loss = tf.keras.losses.CategoricalCrossentropy()(y_test_tf, predictions)
            grads = tape.gradient(loss,imgv)
        signed_grads = tf.sign(grads)
        gen_img = gen_img + (alpha*signed_grads)
        gen_img = tf.clip_by_value(gen_img, x_test[i]-epsilon, x_test[i]+epsilon)
#     for i in range(16):
#         image_count += 1
#     if image_count>200:
#         flag = 1
#         break
    image_count += 1
    orig_label.append(y_test[i])
    gen_image = gen_img.numpy()*255
    gen_image = np.squeeze(gen_image, axis=0)
    perturbed_images_pgd.append(gen_image)
    orig_image = x_test[i]*255
#         print(gen_image.shape, orig_image.shape)
    imwrite('/kaggle/working/'+str(image_count)+'_gen_img.png',np.clip(gen_image, 0, 255).astype('uint8'))
#     imwrite('/kaggle/working/'+str(image_count)+'_orig_img.png',np.clip(orig_image, 0, 255).astype('uint8'))
#     if flag==1:
#         break 

In [None]:
# import cv2
# generated_images = []
# for i in range(1, len(y_test)+1):
#     x = cv2.imread('/kaggle/working/'+str(i)+'_gen_img.png')
#     generated_images.append(x)
# generated_images = tf.convert_to_tensor(generated_images)
# orig_label = tf.convert_to_tensor(orig_label)

In [None]:
# original_images = []
# for i in range(1, len(y_test)+1):
#     x = cv2.imread('/kaggle/working/'+str(i)+'_orig_img.png')
#     original_images.append(x)
# original_images = tf.convert_to_tensor(generated_images)

In [None]:
perturbed_images_pgd_clipped = np.clip(perturbed_images_pgd, 0, 255)
perturbed_images_pgd_clipped

In [None]:
perturbed_images_pgd_clipped.shape

In [None]:
perturbed_images_pgd_clipped[0].shape

In [None]:
from PIL import Image
img = Image.fromarray(perturbed_images_pgd_clipped[0], 'RGB')
img.save('my.png')
img.show()

In [None]:
# orig_label

In [None]:
# y_test

In [None]:
model.evaluate(perturbed_images_pgd_clipped, y_test, verbose = 1)

# CW

In [None]:
import tensorflow as tf


__all__ = ['cw']

def cw(model, x, y=None, eps=1.0, ord_=2, T=2,
       optimizer=tf.optimizers.Adam(learning_rate=0.1), alpha=0.9,
       min_prob=0, clip=(0.0, 1.0)):
    
    xshape = x.get_shape().as_list()
    noise = tf.compat.v1.get_variable('noise', xshape, tf.float32,
                            initializer=tf.initializers.zeros)

    # scale input to (0, 1)
    x_scaled = (x - clip[0]) / (clip[1] - clip[0])

    # change to sigmoid-space, clip to avoid overflow.
    z = tf.clip_by_value(x_scaled, 1e-8, 1-1e-8)
    xinv = tf.compat.v1.log(z / (1 - z)) / T

    # add noise in sigmoid-space and map back to input domain
    xadv = tf.sigmoid(T * (xinv + noise))
    xadv = xadv * (clip[1] - clip[0]) + clip[0]

    ybar, logits = model(xadv, logits=True)
    ydim = ybar.get_shape().as_list()[1]

    if y is not None:
        y = tf.cond(tf.equal(tf.rank(y), 0),
                    lambda: tf.fill([xshape[0]], y),
                    lambda: tf.identity(y))
    else:
        # we set target to the least-likely label
        y = tf.argmin(ybar, axis=1, output_type=tf.int32)

    mask = tf.one_hot(y, ydim, on_value=0.0, off_value=float('inf'))
    yt = tf.reduce_max(logits - mask, axis=1)
    yo = tf.reduce_max(logits, axis=1)

    # encourage to classify to a wrong category
    loss0 = tf.nn.relu(yo - yt + min_prob)

    axis = list(range(1, len(xshape)))
    ord_ = float(ord_)

    # make sure the adversarial images are visually close
    if 2 == ord_:
        # CW-L2 Original paper uses the reduce_sum version.  These two
        # implementation does not differ much.

        # loss1 = tf.reduce_sum(tf.square(xadv-x), axis=axis)
        loss1 = tf.reduce_mean(tf.square(xadv-x))
    else:
        # CW-Linf
        tau0 = tf.fill([xshape[0]] + [1]*len(axis), clip[1])
        tau = tf.get_variable('cw8-noise-upperbound', dtype=tf.float32,
                              initializer=tau0, trainable=False)
        diff = xadv - x - tau

        # if all values are smaller than the upper bound value tau, we reduce
        # this value via tau*0.9 to make sure L-inf does not get stuck.
        tau = alpha * tf.to_float(tf.reduce_all(diff < 0, axis=axis))
        loss1 = tf.nn.relu(tf.reduce_sum(diff, axis=axis))

    loss = eps*loss0 + loss1
    train_op = optimizer.minimize(loss, var_list=[noise])

    # We may need to update tau after each iteration.  Refer to the CW-Linf
    # section in the original paper.
    if 2 != ord_:
        train_op = tf.group(train_op, tau)

    return train_op, xadv, noise

In [None]:
x_test_tf = tf.convert_to_tensor(x_test[:100])
y_test_tf = tf.convert_to_tensor(y_test[:100])

In [None]:
train_op, xadv, noise = cw(model, x_test_tf, y_test_tf)

In [None]:
model.evaluate(xadv, y_test, verbose = 1)

# Adversarial Retraining

In [None]:
iterations = 35
alpha = 2
epsilon = 8/255


In [None]:
# To retrieve full training dataset from the Image Generators
train_generator.reset()
x_train, y_train = next(train_generator)
for i in range(int(len(train_generator))-1):
    image, label = next(train_generator)
    x_train = np.append(x_train, image, axis=0 )
    y_train = np.append(y_train, label, axis=0)
print(x_train.shape, y_train.shape)

In [None]:
# Generating perturbed training images. Taking perturbed test images from the previous section.
image_count = 0
orig_label_train = []
for i in range(len(y_train)):
    gen_img = tf.identity(x_train[i])
    gen_img = gen_img + tf.random.uniform(gen_img.get_shape().as_list(), minval=-epsilon, maxval=epsilon, dtype=tf.dtypes.float32)
    for iters in range(iterations):
        imgv = tf.Variable(gen_img)
        imgv = tf.reshape(imgv, [1,227, 227, 3])
        with tf.GradientTape() as tape:
            tape.watch(imgv)
            predictions = model(imgv)
#             print(type(y_test[i]), type(predictions))
            y_train_tf = tf.convert_to_tensor(y_train[i], np.float32)
            y_train_tf = tf.expand_dims(y_train_tf, axis=0)
            loss = tf.keras.losses.CategoricalCrossentropy()(y_train_tf, predictions)
            grads = tape.gradient(loss,imgv)
        signed_grads = tf.sign(grads)
        gen_img = gen_img + (alpha*signed_grads)
        gen_img = tf.clip_by_value(gen_img, x_train[i]-epsilon, x_train[i]+epsilon)
#     for i in range(16):
#         image_count += 1
#         if image_count>200:
#             flag = 1
#             break
    image_count += 1
    orig_label_train.append(y_train[i])
    gen_image = gen_img.numpy()*255
    gen_image = np.squeeze(gen_image, axis=0)
    orig_image = x_train[i]*255
#         print(gen_image.shape, orig_image.shape)
    imwrite('/kaggle/working/'+str(image_count)+'_gen_img_train.png',np.clip(gen_image, 0, 255).astype('uint8'))
    imwrite('/kaggle/working/'+str(image_count)+'_orig_img_train.png',np.clip(orig_image, 0, 255).astype('uint8'))


In [None]:
# generated_images_train = []
# for i in range(1, len(y_train)+1):
#     x = cv2.imread('/kaggle/working/'+str(i)+'_gen_img_train.png')
#     generated_images_train.append(x)
# generated_images_train = tf.convert_to_tensor(generated_images)
# orig_label_train = tf.convert_to_tensor(orig_label_train)

In [None]:
# x_final = tf.concat([x_train, generated_images_train], 0)
# y_final = tf.concat([y_train, y_train], 0)

In [None]:
IncV3 = InceptionV3(include_top = False, weights = "imagenet",input_shape = (227,227,3))
model_adv = Sequential()

model_adv.add(IncV3)
model_adv.add(Flatten())

model_adv.add(Dense(units = 2048, activation = "relu"))
model_adv.add(Dropout(0.5))
model_adv.add(Dense(units = 5, activation = "softmax"))

In [None]:
model_adv.compile(optimizer = "adam", loss = "categorical_crossentropy", metrics = ["accuracy"])

In [None]:
# nb_epochs = 100
# model.fit(
#     train_generator,
#     steps_per_epoch = train_generator.samples // batch_size,
#     validation_data = validation_generator, 
#     validation_steps = validation_generator.samples // batch_size,
#     epochs = nb_epochs)

In [None]:
# model_adv.fit(x_final, y_final, epochs = 100)

In [None]:
e = 125/255
# Here, eps/28 is being used to give the effect of binding. Where 28 comes from sqrt(N*N) = sqrt(28*28)
# To store the adverserially perturbed train images.
x_train_perturbed = []

for i in range(2931):
    image = x_train[i]
    label = y_train[i]
    
    perturbations = fgsm_untargeted(image.reshape((1, 227, 227, 3)), label).numpy()
    adversarial = tf.clip_by_value((image + perturbations * e), clip_value_min = 0, clip_value_max = 1)
    adversarial = adversarial.numpy()
    x_train_perturbed.append(adversarial)
    
x_train_perturbed = np.asarray(x_train_perturbed).reshape((2931,227,227,3))

In [None]:
e = 125/255
x_test_perturbed = []
for i in range(731):
    image = x_test[i]
    label = y_test[i]
    perturbations = fgsm_untargeted(image.reshape((1, 227, 227, 3)), label).numpy()
    adversarial = tf.clip_by_value((image + perturbations * e), clip_value_min = 0, clip_value_max = 1)
    adversarial = adversarial.numpy()
    x_test_perturbed.append(adversarial)
    
x_test_perturbed = np.asarray(x_test_perturbed).reshape((731,227,227,3))

In [None]:
# Saving this array for a different experiment.
with open('perturbed_images.npy', 'wb') as f:
    np.save(f, x_test_perturbed)

In [None]:
x_final = tf.concat([x_train, x_train_perturbed], 0)
y_final = tf.concat([y_train, y_train], 0)

In [None]:
model_adv.fit(x_final, y_final, epochs = 100)

In [None]:
model_adv.evaluate(x_test,  y_test, verbose = 2)

In [None]:
model_adv.evaluate(x_test_perturbed,  y_test, verbose = 2)