In [1]:
import os
import numpy as np
import matplotlib
matplotlib.use('Agg')           
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import tensorflow as tf
from tensorflow import keras

input_shape = (28, 28, 1)
num_classes = 10
VALIDATION_SPLIT = 0.1
BATCH_SIZE = 128

In [2]:
print('\nLoading MNIST')

mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = np.expand_dims(x_train, -1)
x_train = x_train.astype(np.float32) / 255
x_test = np.expand_dims(x_test, -1)
x_test = x_test.astype(np.float32) / 255

mnist_twos = x_train[np.where(y_train == 2)][:10]

#y_train = keras.utils.to_categorical(y_train, num_classes)
#y_test = keras.utils.to_categorical(y_test, num_classes)

print('\nSpliting data')

ind = np.random.permutation(x_train.shape[0])
x_train, y_train = x_train[ind], y_train[ind]
n = int(x_train.shape[0] * (1-VALIDATION_SPLIT))
x_train = x_train[:n]
y_train = y_train[:n]
x_val = x_train[n:]
y_val = y_train[n:]


Loading MNIST
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz

Spliting data


In [3]:
print(y_train.shape)
print(y_test.shape)
print('\nConstructing model')
def make_model(input_shape, num_classes):
    inputs = keras.Input(shape = input_shape)
    x = keras.layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(inputs)
    x = keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    x = keras.layers.Conv2D(filters=64, kernel_size=(3, 3), padding='same', activation='relu')(x)
    x = keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
    x = keras.layers.Flatten()(x)
    x = keras.layers.Dense(units=128, activation='relu')(x)
    x = keras.layers.Dropout(0.25)(x)
    outputs = keras.layers.Dense(units=num_classes)(x)
    #outputs = keras.layers.Activation('relu')
    return keras.Model(inputs, outputs)

(54000,)
(10000,)

Constructing model


In [9]:
model_cnn = make_model(input_shape, num_classes)
X_train = []
X_test = []
begin = 0


for i in range(0, x_train.shape[0] , 100):
  X_train.append((x_train[i:i+100],y_train[i:i+100]))
  

for i in range(0, x_test.shape[0] , 100):
  X_test.append((x_train[i:i+100],y_test[i:i+100]))
print(y_train.shape)
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits = True)

(54000,)


In [None]:
def fgsm(model, X, y, epsilon=0.1):
    """ Construct FGSM adversarial examples on the examples X"""
    with tf.GradientTape() as tape:
      tape.watch(X)
      #prediction = model.predict(X)
      #print(type(y))
      loss = loss_fn(y,model(X))
      #print(type(loss))
    gradient = tape.gradient(loss, X)
    signed_gradient = tf.sign(gradient)
    #print(gradient.shape)
    return epsilon*signed_gradient

  
'''
def pgd_linf(model, X, y, epsilon=0.1, alpha=0.01, num_iter=20, randomize=False):
    """ Construct FGSM adversarial examples on the examples X"""
    if randomize:
        delta = np.random.randn(X.shape)
        delta = delta* 2 * epsilon - epsilon
    else:
        delta = np.zeros(X.shape)
    
    for t in range(num_iter):
      with tf.GradientTape() as tape: 
        tf.
    for t in range(num_iter):
        loss = nn.CrossEntropyLoss()(model(X + delta), y)
        loss.backward()
        delta.data = (delta + alpha*delta.grad.detach().sign()).clamp(-epsilon,epsilon)
        delta.grad.zero_()
    return delta.detach()

'''

In [47]:
def epoch(model , X, loss_fn , optimizer):
  total_loss,total_error = 0.,0.
  for x,y in X :
    with tf.GradientTape() as tape:
      yp = model(x, training = True)
      #print("yp =" , yp.shape)
      #print("y = " , y.shape)
      loss_value = loss_fn(y, yp)
      #print(type(loss_value))
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    total_error += np.sum(np.argmax(yp,axis = 1)!=1)
    total_loss += loss_value*len(X)
    return total_error / len(X), total_loss /len(X)

def epoch_adversarial(model, X ,loss_fn, optimizer):
  total_loss,total_error = 0.,0.
  for x,y in X :
    delta = fgsm(model, tf.convert_to_tensor(x) , y)
    with tf.GradientTape() as tape:
      yp = model(x + delta, training = True)
      loss_value = loss_fn(y, yp)
    grads = tape.gradient(loss_value, model.trainable_weights)
    optimizer.apply_gradients(zip(grads, model.trainable_weights))
    total_error += np.sum(np.argmax(yp,axis = 1)!=1)
    total_loss += loss_value*len(X)
    return total_error / len(X), total_loss /len(X) 

In [53]:
optimizer = tf.keras.optimizers.SGD(learning_rate = 0.2)
for t in range(10):
  train_err, train_loss = epoch(model_cnn, X_train,loss_fn,optimizer)
  test_err, test_loss = epoch(model_cnn, X_test,loss_fn,optimizer)
  adv_err, adv_loss = epoch_adversarial(model_cnn,X_test,loss_fn,optimizer)
  if(t == 4):
    optimizer.learning_rate = 1e-2
  print(*("{:.6f}".format(i) for i in (train_err, test_err, adv_err)), sep="\t")
  

  

0.088889	0.470000	0.460000
0.075926	0.620000	0.440000
0.074074	0.520000	0.530000
0.064815	0.600000	0.400000
0.070370	0.430000	0.370000
0.033333	0.270000	0.250000
0.027778	0.240000	0.380000
0.031481	0.180000	0.250000
0.042593	0.270000	0.240000
0.044444	0.220000	0.190000


In [33]:
model_cnn_robust = make_model(input_shape, num_classes)


In [43]:
optimizer_robust = tf.keras.optimizers.Adam(learning_rate = 0.1)
for t in range(10):
    train_err, train_loss = epoch_adversarial(model_cnn_robust, X_train,loss_fn,optimizer_robust)
    test_err, test_loss = epoch(model_cnn_robust, X_test,loss_fn,optimizer_robust)
    adv_err, adv_loss = epoch_adversarial(model_cnn_robust,X_test,loss_fn, optimizer_robust)
    if(t == 4):
      optimizer_robust.learning_rate = 1e-2
    print(*("{:.6f}".format(i) for i in (train_err, test_err, adv_err)), sep="\t")

0.177778	0.900000	0.920000
0.175926	0.730000	0.910000
0.170370	0.640000	0.740000
0.129630	0.720000	0.680000
0.135185	0.530000	0.680000
0.120370	0.390000	0.570000
0.114815	0.430000	0.650000
0.120370	0.410000	0.600000
0.092593	0.460000	0.570000
0.101852	0.560000	0.560000
