In [None]:
from test_nn import test_model
from lp import run_lp
from os import path
from spectrum_analysis import *
from utils import save_perturbed_test_groups, load_perturbed_test_groups
from utils import load_suspicious_neurons, save_suspicious_neurons
from utils import create_experiment_dir, get_trainable_layers
from utils import load_classifications, save_classifications
from utils import save_layer_outs, load_layer_outs, construct_spectrum_matrices
from utils import load_MNIST, load_CIFAR, load_model
from utils import filter_val_set, save_original_inputs
from input_synthesis import synthesize
from sklearn.model_selection import train_test_split
import datetime
import argparse
import random
from collections import defaultdict
import matplotlib.pyplot as plt
from keras import models

In [None]:
experiment_path = "experiment_results"
model_path = "neural_networks"
group_index = 1
__version__ = "v1.0"


nb_classes = 10
perturbed_count = 50

In [None]:
args =  {
    'model': 'mnist_test_model_5_30_relu',
    'suspicious_num': 10
}

args = defaultdict(lambda: None, args)

In [None]:
model_name     = args['model']
dataset        = args['dataset'] if not args['dataset'] == None else 'mnist'
step_size      = args['step_size'] if not args['step_size'] == None else 1
distance       = args['distance'] if not args['distance'] ==None else 0.1
approach       = args['approach'] if not args['approach'] == None else 'tarantula'
susp_num       = args['suspicious_num'] if not args['suspicious_num'] == None else 1
repeat         = args['repeat'] if not args['repeat'] == None else 1
seed           = args['seed'] if not args['seed'] == None else random.randint(0,10)
star           = args['star'] if not args['star'] == None else 3
logfile_name   = args['logfile'] if not args['logfile'] == None else 'result.log'

In [None]:
####################
# 0) Load MNIST or CIFAR10 data
if dataset == 'mnist':
    X_train, Y_train, X_test, Y_test = load_MNIST()
else:
    X_train, Y_train, X_test, Y_test = load_CIFAR()


X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train,
                                                  test_size=1/6.0,
                                                  random_state=seed)

In [None]:
####################
# 1) Load the pretrained network.
try:
    model = load_model(path.join(model_path, model_name))
except:
    logfile.write("Model not found! Provide a pre-trained model model as input.")
    exit(1)

In [None]:
def exp(selected_class, X, Y):

    logfile = open(logfile_name, 'a')

    #Fault localization is done per class.
    X, Y = filter_val_set(selected_class, X, Y)

    ####################
    # 2)test the model and receive the indexes of correct and incorrect classifications
    # Also provide output of each neuron in each layer for test input x.
    correct_classifications, misclassifications, layer_outs, predictions = test_model(model, X, Y)

    ####################
    # 3) Receive the correct classifications  & misclassifications and identify 
    # the suspicious neurons per layer
    trainable_layers = get_trainable_layers(model)
    scores, num_cf, num_uf, num_cs, num_us = construct_spectrum_matrices(model,
                                                                        trainable_layers,
                                                                        correct_classifications,
                                                                        misclassifications,
                                                                        layer_outs)

    filename = experiment_path + '/' + model_name + '_C' + str(selected_class) + '_' +\
    approach +  '_SN' +  str(susp_num)

    if approach == 'tarantula':
        suspicious_neuron_idx = tarantula_analysis(trainable_layers, scores,
                                             num_cf, num_uf, num_cs, num_us,
                                             susp_num)
    else:
        print('Wrong approach')
        exit()


    logfile.write('Suspicous neurons: ' + str(suspicious_neuron_idx) + '\n')

    ####################
    # 4) Run Suspiciousness-Guided Input Synthesis Algorithm
    # Receive the set of suspicious neurons for each layer from Step 3 # and 
    # will produce new inputs based on the correct classifications (from the 
    # testing set) that exercise the suspicious neurons

    perturbed_xs = []
    perturbed_ys = []

    # select 10 inputs randomly from the correct classification set.
    #selected = list(correct_classifications)
    if perturbed_count == -1:
        selected = list(correct_classifications)
    else:
        selected = np.random.choice(list(correct_classifications), size=perturbed_count, replace=False)
        
    zipped_data = zip(list(np.array(X)[selected]), list(np.array(Y)[selected]))

    syn_start = datetime.datetime.now()
    x_perturbed, y_perturbed, x_original = synthesize(model, zipped_data,
                                           suspicious_neuron_idx,
                                           correct_classifications,
                                           step_size,
                                           distance)
    syn_end = datetime.datetime.now()

    perturbed_xs = perturbed_xs + x_perturbed
    perturbed_ys = perturbed_ys + y_perturbed

    # reshape them into the expected format
    perturbed_xs = np.asarray(perturbed_xs).reshape(np.asarray(perturbed_xs).shape[0],
                                     *X[0].shape)
 
    perturbed_ys = np.asarray(perturbed_ys).reshape(np.asarray(perturbed_ys).shape[0], 10)

    for i in range(len(perturbed_xs)):
        name = 'susp_adv_inputs/'+model_name+'_'+str(perturbed_ys[i])+'_C'+str(selected_class)+'_susp_guided_'+str(selected[i])+'.png'
        plt.imsave(name, perturbed_xs[i].reshape(28,28), cmap='gray')

    ####################
    # 5) Test if the mutated inputs are adversarial
    score = model.evaluate(perturbed_xs, perturbed_ys, verbose=0)
    logfile.write('Model: ' + model_name + ', Class: ' + str(selected_class) +
                  ', Approach: ' + approach + ', Distance: ' +
                  str(distance) + ', Score: ' + str(score) + '\n')

    logfile.write('Input Synthesis Time: ' + str(syn_end-syn_start) + '\n')

    logfile.close()
    
    return perturbed_xs, perturbed_ys, suspicious_neuron_idx

In [None]:
initial = model.evaluate(X_test, Y_test)

In [None]:
perturbed_xs_by_class, perturbed_ys_by_class, sus_ids_by_class = range(nb_classes), range(nb_classes), range(nb_classes)

for selected_class in range(nb_classes):
    perturbed_xs_by_class[selected_class], perturbed_ys_by_class[selected_class], sus_ids_by_class[selected_class] = exp(selected_class, X_val, Y_val)

for selected_class in range(nb_classes):
    model.fit(perturbed_xs_by_class[selected_class], perturbed_ys_by_class[selected_class], epochs=1)

In [None]:
a=np.concatenate(perturbed_xs_by_class)
a=np.concatenate([a, X_train])
b=np.concatenate(perturbed_ys_by_class)
b=np.concatenate([b, Y_train])

In [None]:
print(initial)

In [None]:
new_model = models.model_from_json(model.to_json())
new_model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])
new_model.fit(a, b, epochs=10, batch_size=32)

In [None]:
retrained_metrics = new_model.evaluate(X_test, Y_test)
print(retrained_metrics)