Load required packages (Keras, tensotflow-gpu, numpy, opencv-python, matplotlib (for plotting))

In [None]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import matplotlib.pyplot as plt
import cv2
import numpy as np
import tensorflow as tf
import Structural_Perturbations as SP
tf.logging.set_verbosity(tf.logging.ERROR)
# import os
# os.environ["CUDA_VISIBLE_DEVICES"]="3"

Load Dataset, specify hyperparameters such as batch_size, epochs

In [None]:
import keras
from keras.models import Sequential, load_model
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K

"""Specifies batch size, number of classes in data and number of epochs"""
batch_size = 128
num_classes = 10
epochs = 10

"""input image dimensions""" 
img_rows, img_cols = 28, 28

"""Load/Download Dataset"""
from keras.datasets import mnist
train_data,train_labels,eval_data,eval_labels = SP.load('mnist')

Defines model architecture, model evaluation function, training function, derivative function

In [None]:
"""Defines architecture of the model. We use a 2 conv-layer followed by a fully-connected layer with 
   max-pooling and dropout in between layers. Loss is categorical cross-entropy and optimiser used is Adam"""
def new_model():
    model = Sequential()
    model.add(
        Conv2D(
            32, kernel_size=(3, 3), activation='relu',
            input_shape=input_shape))
    model.add(Conv2D(64, (3, 3), activation='relu'))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))
    model.compile(
        loss=keras.losses.categorical_crossentropy,
        optimizer=keras.optimizers.Adam(),
        metrics=['accuracy'])
    return model


"""Evaluates a given model over a range of perturbation
   Local Variables : 
   
   res : stores the perturbation and accuracy of the model on that pertubation
   df : stores the values of the derivative of the accuracy vs perturbation array
   model : the loaded model
   base : baseline accuracy of the model
   name : name of the perturbation
   alpha : the array of perturbation to check
   d : the original unperturbed dataset"""
def evaluate(model, base, d, name, alpha):
    global epsilon
    print("Evaluating", alpha)
    res = []
    for i in alpha:
        data = np.copy(d)
        data = SP.Transform(name, data, i)
        labels = data[:, -1]
        labels = keras.utils.to_categorical(labels, 10)
        data = data[:, 0:784]
        data = data.reshape(-1, 28, 28, 1)
        m1 = model.evaluate(
            data, labels, verbose=0, batch_size=2048)[1]
        res.append([i, m1])
    res = np.array(res)
    
    '''Check if the lowest model accuracy is below threshold. If yes find the largest change, calculate if accuracy
        at that point is below threshold and return the perturbation(i). If no, return out of range perturbation to quit'''
    
    if base - min(res[:, 1]) > epsilon:
        res = np.array(res)
        res = res[np.lexsort(np.fliplr(res).T)]
        df = derivative(res)
        for k in range(1, df[:, 1].shape[0]):
            i = df[:, 0][np.where(df[:, 1] == np.sort(df[:, 1])[-k])][0]
            if (base - res[:, 1][np.where(res[:, 0] == i)][0]) > epsilon:
#                 print("Accuracy at ", i, " = ",
#                       res[:, 1][np.where(res[:, 0] == i)])
                break
#         """Plot the derivative values"""
#         plt.plot(df[:, 0], df[:, 1])
#         plt.axvline(i)
#         df = df[np.lexsort(np.fliplr(df).T)]
#         plt.show()
        return i, res
    else:
        return max(alpha)+1, res

"""Perturbs training dataset symmetrically and trains a given model on a perturbed dataset. Returns model"""
def train_model(name, d, alpha):
    data = np.copy(d)
    if name == 'Perspective':
        alpha.append(56-alpha[-1])
    elif name == "Scaling":
        alpha.append(2-alpha[-1])
    else:
        alpha.append(-alpha[-1])
    print("To be trained on ", alpha)
    for i in alpha:
        if i != alpha[0]:
            perturbed_data = SP.Transform(name, d, i)
            data = np.concatenate((data, perturbed_data))
    model = new_model()
    labels = data[:, -1]
    labels = keras.utils.to_categorical(labels, 10)
    data = data[:, 0:784].reshape(-1, 28, 28, 1)
    model.fit(data, labels, batch_size=batch_size, epochs=epochs, verbose=1)
    return model

"""Loads data and stacks the labels as a column of data. This is required for multiprocessing."""
def load_data():
    train_data, train_labels, eval_data, eval_labels = SP.load('mnist')
    train_data = train_data.reshape(-1, 784)
    train_labels = train_labels.reshape(-1, 1)
    train = np.hstack((train_data, train_labels))
    eval_data = eval_data.reshape(-1, 784)
    eval_labels = eval_labels.reshape(-1, 1)
    input_shape = (img_rows, img_cols, 1)
    test = np.hstack((eval_data, eval_labels))
    return (train, test, input_shape)

"""Return the derivative array"""
def derivative(res, width=1):
    res = np.array(res)
    res = res[np.lexsort(np.fliplr(res).T)]
    a = []
    for i in range(res.shape[0] - 1):
        a.append([res[:, 0][i], np.abs(res[:, 1][i + 1] - res[:, 1][i])])
    return np.array(a)

In [None]:
"""Loads the data and Vanilla model"""
train,test,input_shape = load_data()
model = load_model("../../MNIST/MNIST_vanilla.h5")

In [None]:
"""Finds the baseline accuracy of the vanilla model.
   Variables:
   
   res : Stores the results array of accurcy vs perturbation
   
   points : Initialize with initial (natural) pertubation.
            Scaling : 1
            Perspective : 28
            Else : 0
            Stores the points on which derivative was maximum and hence needs training.
            
   epsilon : Defines our threshold. 0.1 = 10% accuracy drop
   
   max_perturbation : Stores the maximum allowed perturbation
   
   name : Name of pertubration {Exposure, Scaling, Rotation, Translation, Shear, Perspective}
   
   x : step_length
   
   set_to_test : the array of pertubation to test on. 
                 Scaling ranges from 0 to a positive number.
                 Perspective ranges from 56-max to 28+max
                 Others range from -max to +max"""

base = model.evaluate(train_data.reshape(-1,28,28,1),keras.utils.to_categorical(train_labels,10),verbose=0)[1]
res = []
points = []
epsilon = 0.1
print("Base Accuracy is ",base)

"""Make sure to change these three variables appropriately"""
name = "Rotation"
max_perturbation = 90
x = 10

set_to_test = []
if name == "Scaling":
    set_to_test = np.arange(2-max_perturbation,max_perturbation+x,x)
    points = [1]
elif name == "Perspective":
    set_to_test = np.arange(56-max_perturbation,max_perturbation+x,x)
    points = [28]
else:
    set_to_test = np.arange(-max_perturbation,max_perturbation+x,x)
    points = [0]

In [None]:
"""The main loop. It continues til the model has >= accuracy than threshold on all pertubations in range"""

while(True):
    i,r = evaluate(model,base,train,name,set_to_test)
    if (i > max_perturbation):
        break
    res.append(r)
    points.append(i)
    model= train_model(name,train,points)
    print("New model created")
print("Model is robust")
points = np.array(points).round(decimals=2)
res = np.array(res)

In [None]:
"""Test the robust model on test data over same pertubation range. Vertical lines show the training points"""
r = evaluate(model,base,test,name,set_to_test)[1]
plt.xlabel(name,fontsize = 18)
plt.ylim(0,1.1)
for i in points:
    plt.axvline(i,c='red',linewidth=0.5)
plt.ylabel("Accuracy",fontsize = 18)
plt.plot(r[:,0],r[:,1])
plt.show()

In [None]:
"""Save the model, results array and array of points"""
model.save("MNIST_" + name +".h5")
np.save("MNIST_" + name +"_res.npy",res)
np.save("MNIST_"+ name +"_points.npy",points)