In [30]:
!pip install np-utils
# from kera.utils import to_categorical does not work! 



In [31]:
class bcolors:
    HEADER = '\033[95m'
    OKBLUE = '\033[94m'
    OKGREEN = '\033[92m'
    WARNING = '\033[93m'
    FAIL = '\033[91m'
    ENDC = '\033[0m'
    BOLD = '\033[1m'
    UNDERLINE = '\033[4m'

In [32]:
'''
LeNet-1
'''

# usage: python MNISTModel1.py - train the model

from __future__ import print_function

from keras.datasets import mnist
from keras.layers import Convolution2D, MaxPooling2D, Input, Dense, Activation, Flatten
from keras.models import Model
from keras.utils.np_utils import to_categorical

def Model1(input_tensor=None, train=False):
    nb_classes = 10
    # convolution kernel size
    kernel_size = (5, 5)

    if train:
        batch_size = 256
        nb_epoch = 10

        # input image dimensions
        img_rows, img_cols = 28, 28

        # the data, shuffled and split between train and test sets
        (x_train, y_train), (x_test, y_test) = mnist.load_data()

        print(x_train.shape)
        x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
        x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
        input_shape = (img_rows, img_cols, 1)

        x_train = x_train.astype('float32')
        x_test = x_test.astype('float32')
        x_train /= 255
        x_test /= 255

        # convert class vectors to binary class matrices
        y_train = to_categorical(y_train, nb_classes)
        y_test = to_categorical(y_test, nb_classes)

        input_tensor = Input(shape=input_shape)
    elif input_tensor is None:
        print(bcolors.FAIL + 'you have to proved input_tensor when testing')
        exit()

    # block1
    x = Convolution2D(4, kernel_size, activation='relu', padding='same', name='block1_conv1')(input_tensor)
    x = MaxPooling2D(pool_size=(2, 2), name='block1_pool1')(x)

    # block2
    x = Convolution2D(12, kernel_size, activation='relu', padding='same', name='block2_conv1')(x)
    x = MaxPooling2D(pool_size=(2, 2), name='block2_pool1')(x)

    x = Flatten(name='flatten')(x)
    x = Dense(nb_classes, name='before_softmax')(x)
    x = Activation('softmax', name='predictions')(x)

    model = Model(input_tensor, x)

    if train:
        # compiling
        model.compile(loss='categorical_crossentropy', optimizer='adadelta', metrics=['accuracy'])

        # trainig
        model.fit(x_train, y_train, validation_data=(x_test, y_test), batch_size=batch_size, epochs=nb_epoch, verbose=1)
        # save model
        model.save_weights('./Model1.h5')
        score = model.evaluate(x_test, y_test, verbose=0)
        print('\n')
        print('Overall Test score:', score[0])
        print('Overall Test accuracy:', score[1])
    else:
        model.load_weights('./Model1.h5')
        print(bcolors.OKBLUE + 'Model1 loaded' + bcolors.ENDC)

    return model

In [33]:
'''
LeNet-4
'''

# usage: python MNISTModel2.py - train the model

from __future__ import print_function

from keras.datasets import mnist
from keras.layers import Convolution2D, MaxPooling2D, Input, Dense, Activation, Flatten
from keras.models import Model
from keras.utils.np_utils import to_categorical

def Model2(input_tensor=None, train=False):
    nb_classes = 10
    # convolution kernel size
    kernel_size = (5, 5)

    if train:
        batch_size = 256
        nb_epoch = 10

        # input image dimensions
        img_rows, img_cols = 28, 28

        # the data, shuffled and split between train and test sets
        (x_train, y_train), (x_test, y_test) = mnist.load_data()

        x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
        x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
        input_shape = (img_rows, img_cols, 1)

        x_train = x_train.astype('float32')
        x_test = x_test.astype('float32')
        x_train /= 255
        x_test /= 255

        # convert class vectors to binary class matrices
        y_train = to_categorical(y_train, nb_classes)
        y_test = to_categorical(y_test, nb_classes)

        input_tensor = Input(shape=input_shape)
    elif input_tensor is None:
        print(bcolors.FAIL + 'you have to proved input_tensor when testing')
        exit()

    # block1
    x = Convolution2D(6, kernel_size, activation='relu', padding='same', name='block1_conv1')(input_tensor)
    x = MaxPooling2D(pool_size=(2, 2), name='block1_pool1')(x)

    # block2
    x = Convolution2D(16, kernel_size, activation='relu', padding='same', name='block2_conv1')(x)
    x = MaxPooling2D(pool_size=(2, 2), name='block2_pool1')(x)

    x = Flatten(name='flatten')(x)
    x = Dense(84, activation='relu', name='fc1')(x)
    x = Dense(nb_classes, name='before_softmax')(x)
    x = Activation('softmax', name='predictions')(x)

    model = Model(input_tensor, x)

    if train:
        # compiling
        model.compile(loss='categorical_crossentropy', optimizer='adadelta', metrics=['accuracy'])

        # trainig
        model.fit(x_train, y_train, validation_data=(x_test, y_test), batch_size=batch_size, epochs=nb_epoch, verbose=1)
        # save model
        model.save_weights('./Model2.h5')
        score = model.evaluate(x_test, y_test, verbose=0)
        print('\n')
        print('Overall Test score:', score[0])
        print('Overall Test accuracy:', score[1])
    else:
        model.load_weights('./Model2.h5')
        print(bcolors.OKBLUE + 'Model2 loaded' + bcolors.ENDC)

    return model

In [34]:
'''
LeNet-5
'''

# usage: python MNISTModel3.py - train the model

from __future__ import print_function

from keras.datasets import mnist
from keras.layers import Convolution2D, MaxPooling2D, Input, Dense, Activation, Flatten
from keras.models import Model
from keras.utils.np_utils import to_categorical


def Model3(input_tensor=None, train=False):
    nb_classes = 10
    # convolution kernel size
    kernel_size = (5, 5)

    if train:
        batch_size = 256
        nb_epoch = 10

        # input image dimensions
        img_rows, img_cols = 28, 28

        # the data, shuffled and split between train and test sets
        (x_train, y_train), (x_test, y_test) = mnist.load_data()

        x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
        x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
        input_shape = (img_rows, img_cols, 1)

        x_train = x_train.astype('float32')
        x_test = x_test.astype('float32')
        x_train /= 255
        x_test /= 255

        # convert class vectors to binary class matrices
        y_train = to_categorical(y_train, nb_classes)
        y_test = to_categorical(y_test, nb_classes)

        input_tensor = Input(shape=input_shape)
    elif input_tensor is None:
        print(bcolors.FAIL + 'you have to proved input_tensor when testing')
        exit()

    # block1
    x = Convolution2D(6, kernel_size, activation='relu', padding='same', name='block1_conv1')(input_tensor)
    x = MaxPooling2D(pool_size=(2, 2), name='block1_pool1')(x)

    # block2
    x = Convolution2D(16, kernel_size, activation='relu', padding='same', name='block2_conv1')(x)
    x = MaxPooling2D(pool_size=(2, 2), name='block2_pool1')(x)

    x = Flatten(name='flatten')(x)
    x = Dense(120, activation='relu', name='fc1')(x)
    x = Dense(84, activation='relu', name='fc2')(x)
    x = Dense(nb_classes, name='before_softmax')(x)
    x = Activation('softmax', name='predictions')(x)

    model = Model(input_tensor, x)

    if train:
        # compiling
        model.compile(loss='categorical_crossentropy', optimizer='adadelta', metrics=['accuracy'])

        # trainig
        model.fit(x_train, y_train, validation_data=(x_test, y_test), batch_size=batch_size, epochs=nb_epoch, verbose=1)
        # save model
        model.save_weights('./Model3.h5')
        score = model.evaluate(x_test, y_test, verbose=0)
        print('\n')
        print('Overall Test score:', score[0])
        print('Overall Test accuracy:', score[1])
    else:
        model.load_weights('./Model3.h5')
        print(bcolors.OKBLUE + 'Model3 loaded' + bcolors.ENDC)

    return model

In [42]:
import random
from collections import defaultdict

import numpy as np
from keras import backend as K
from keras.models import Model


# util function to convert a tensor into a valid image
def deprocess_image(x):
    x *= 255
    x = np.clip(x, 0, 255).astype('uint8')
    return x.reshape(x.shape[1], x.shape[2])  # original shape (1,img_rows, img_cols,1)


def normalize(x):
    # utility function to normalize a tensor by its L2 norm
    return x / (K.sqrt(K.mean(K.square(x))) + 1e-5)


def constraint_occl(gradients, start_point, rect_shape):
    new_grads = np.zeros_like(gradients)
    new_grads[:, start_point[0]:start_point[0] + rect_shape[0],
    start_point[1]:start_point[1] + rect_shape[1]] = gradients[:, start_point[0]:start_point[0] + rect_shape[0],
                                                     start_point[1]:start_point[1] + rect_shape[1]]
    return new_grads


def constraint_light(gradients):
    new_grads = np.ones_like(gradients)
    grad_mean = np.mean(gradients)
    return grad_mean * new_grads


def constraint_black(gradients, rect_shape=(6, 6)):
    start_point = (
        random.randint(0, gradients.shape[1] - rect_shape[0]), random.randint(0, gradients.shape[2] - rect_shape[1]))
    new_grads = np.zeros_like(gradients)
    patch = gradients[:, start_point[0]:start_point[0] + rect_shape[0], start_point[1]:start_point[1] + rect_shape[1]]
    if np.mean(patch) < 0:
        new_grads[:, start_point[0]:start_point[0] + rect_shape[0],
        start_point[1]:start_point[1] + rect_shape[1]] = -np.ones_like(patch)
    return new_grads


def init_coverage_tables(model1, model2, model3):
    model_layer_dict1 = defaultdict(bool)
    model_layer_dict2 = defaultdict(bool)
    model_layer_dict3 = defaultdict(bool)
    init_dict(model1, model_layer_dict1)
    init_dict(model2, model_layer_dict2)
    init_dict(model3, model_layer_dict3)
    return model_layer_dict1, model_layer_dict2, model_layer_dict3


def init_dict(model, model_layer_dict):
    for layer in model.layers:
        if 'flatten' in layer.name or 'input' in layer.name:
            continue
        for index in range(layer.output_shape[-1]):
            model_layer_dict[(layer.name, index)] = False


def neuron_to_cover(model_layer_dict):
    not_covered = [(layer_name, index) for (layer_name, index), v in model_layer_dict.items() if not v]
    if not_covered:
        layer_name, index = random.choice(not_covered)
    else:
        layer_name, index = random.choice(model_layer_dict.keys())
    return layer_name, index


def neuron_covered(model_layer_dict):
    covered_neurons = len([v for v in model_layer_dict.values() if v])
    total_neurons = len(model_layer_dict)
    return covered_neurons, total_neurons, covered_neurons / float(total_neurons)


def update_coverage(input_data, model, model_layer_dict, threshold=0):
    layer_names = [layer.name for layer in model.layers if
                   'flatten' not in layer.name and 'input' not in layer.name]

    intermediate_layer_model = Model(inputs=model.input,
                                     outputs=[model.get_layer(layer_name).output for layer_name in layer_names])
    intermediate_layer_outputs = intermediate_layer_model.predict(input_data)

    for i, intermediate_layer_output in enumerate(intermediate_layer_outputs):
        scaled = scale(intermediate_layer_output[0])
        for num_neuron in range(scaled.shape[-1]):
            if np.mean(scaled[..., num_neuron]) > threshold and not model_layer_dict[(layer_names[i], num_neuron)]:
                model_layer_dict[(layer_names[i], num_neuron)] = True


def full_coverage(model_layer_dict):
    if False in model_layer_dict.values():
        return False
    return True


def scale(intermediate_layer_output, rmax=1, rmin=0):
    X_std = (intermediate_layer_output - intermediate_layer_output.min()) / (
        intermediate_layer_output.max() - intermediate_layer_output.min())
    X_scaled = X_std * (rmax - rmin) + rmin
    return X_scaled


def fired(model, layer_name, index, input_data, threshold=0):
    intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output)
    intermediate_layer_output = intermediate_layer_model.predict(input_data)[0]
    scaled = scale(intermediate_layer_output)
    if np.mean(scaled[..., index]) > threshold:
        return True
    return False


def diverged(predictions1, predictions2, predictions3, target):
    #     if predictions2 == predictions3 == target and predictions1 != target:
    if not predictions1 == predictions2 == predictions3:
        return True
    return False

In [47]:
from __future__ import print_function

import argparse

from keras.datasets import mnist
from keras.layers import Input
from imageio import imwrite

import os

def gen_diff(arguments_list):
    # read the parameter
    # argument parsing
    parser = argparse.ArgumentParser(description='Main function for difference-inducing input generation in MNIST dataset')
    parser.add_argument('transformation', help="realistic transformation type", choices=['light', 'occl', 'blackout'])
    parser.add_argument('weight_diff', help="weight hyperparm to control differential behavior", type=float)
    parser.add_argument('weight_nc', help="weight hyperparm to control neuron coverage", type=float)
    parser.add_argument('step', help="step size of gradient descent", type=float)
    parser.add_argument('seeds', help="number of seeds of input", type=int)
    parser.add_argument('grad_iterations', help="number of iterations of gradient descent", type=int)
    parser.add_argument('threshold', help="threshold for determining neuron activated", type=float)
    parser.add_argument('-t', '--target_model', help="target model that we want it predicts differently",
                        choices=[0, 1, 2], default=0, type=int)
    parser.add_argument('-sp', '--start_point', help="occlusion upper left corner coordinate", default=(0, 0), type=tuple)
    parser.add_argument('-occl_size', '--occlusion_size', help="occlusion size", default=(10, 10), type=tuple)

    args = parser.parse_args(arguments_list)

    # input image dimensions
    img_rows, img_cols = 28, 28
    # the data, shuffled and split between train and test sets
    (_, _), (x_test, _) = mnist.load_data()

    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

    x_test = x_test.astype('float32')
    x_test /= 255

    # define input tensor as a placeholder
    input_tensor = Input(shape=input_shape)

    # load multiple models sharing same input tensor
    model1 = Model1(input_tensor=input_tensor)
    model2 = Model2(input_tensor=input_tensor)
    model3 = Model3(input_tensor=input_tensor)

    # init coverage table
    model_layer_dict1, model_layer_dict2, model_layer_dict3 = init_coverage_tables(model1, model2, model3)

    # ==============================================================================================
    # start gen inputs
    for _ in range(args.seeds):
        gen_img = np.expand_dims(random.choice(x_test), axis=0)
        orig_img = gen_img.copy()
        # first check if input already induces differences
        label1, label2, label3 = np.argmax(model1.predict(gen_img)[0]), np.argmax(model2.predict(gen_img)[0]), np.argmax(
            model3.predict(gen_img)[0])

        if not label1 == label2 == label3:
            print(bcolors.OKGREEN + 'input already causes different outputs: {}, {}, {}'.format(label1, label2,
                                                                                                label3) + bcolors.ENDC)

            update_coverage(gen_img, model1, model_layer_dict1, args.threshold)
            update_coverage(gen_img, model2, model_layer_dict2, args.threshold)
            update_coverage(gen_img, model3, model_layer_dict3, args.threshold)

            print(bcolors.OKGREEN + 'covered neurons percentage %d neurons %.3f, %d neurons %.3f, %d neurons %.3f'
                  % (len(model_layer_dict1), neuron_covered(model_layer_dict1)[2], len(model_layer_dict2),
                    neuron_covered(model_layer_dict2)[2], len(model_layer_dict3),
                    neuron_covered(model_layer_dict3)[2]) + bcolors.ENDC)
            averaged_nc = (neuron_covered(model_layer_dict1)[0] + neuron_covered(model_layer_dict2)[0] +
                          neuron_covered(model_layer_dict3)[0]) / float(
                neuron_covered(model_layer_dict1)[1] + neuron_covered(model_layer_dict2)[1] +
                neuron_covered(model_layer_dict3)[
                    1])
            print(bcolors.OKGREEN + 'averaged covered neurons %.3f' % averaged_nc + bcolors.ENDC)

            gen_img_deprocessed = deprocess_image(gen_img)

            # save the result to disk

            if not os.path.isdir('./generated_inputs'):
                os.mkdir('./generated_inputs/')

            imwrite('./generated_inputs/' + 'already_differ_' + str(label1) + '_' + str(
                label2) + '_' + str(label3) + '.png', gen_img_deprocessed)
            continue

        # if all label agrees
        orig_label = label1
        layer_name1, index1 = neuron_to_cover(model_layer_dict1)
        layer_name2, index2 = neuron_to_cover(model_layer_dict2)
        layer_name3, index3 = neuron_to_cover(model_layer_dict3)

        # construct joint loss function
        if args.target_model == 0:
            loss1 = -args.weight_diff * K.mean(model1.get_layer('before_softmax').output[..., orig_label])
            loss2 = K.mean(model2.get_layer('before_softmax').output[..., orig_label])
            loss3 = K.mean(model3.get_layer('before_softmax').output[..., orig_label])
        elif args.target_model == 1:
            loss1 = K.mean(model1.get_layer('before_softmax').output[..., orig_label])
            loss2 = -args.weight_diff * K.mean(model2.get_layer('before_softmax').output[..., orig_label])
            loss3 = K.mean(model3.get_layer('before_softmax').output[..., orig_label])
        elif args.target_model == 2:
            loss1 = K.mean(model1.get_layer('before_softmax').output[..., orig_label])
            loss2 = K.mean(model2.get_layer('before_softmax').output[..., orig_label])
            loss3 = -args.weight_diff * K.mean(model3.get_layer('before_softmax').output[..., orig_label])
        loss1_neuron = K.mean(model1.get_layer(layer_name1).output[..., index1])
        loss2_neuron = K.mean(model2.get_layer(layer_name2).output[..., index2])
        loss3_neuron = K.mean(model3.get_layer(layer_name3).output[..., index3])
        layer_output = (loss1 + loss2 + loss3) + args.weight_nc * (loss1_neuron + loss2_neuron + loss3_neuron)

        # for adversarial image generation
        final_loss = K.mean(layer_output)

        # we compute the gradient of the input picture wrt this loss
        grads = normalize(K.gradients(final_loss, input_tensor)[0])

        # this function returns the loss and grads given the input picture
        iterate = K.function([input_tensor], [loss1, loss2, loss3, loss1_neuron, loss2_neuron, loss3_neuron, grads])

        # we run gradient ascent for 20 steps
        for iters in range(args.grad_iterations):
            loss_value1, loss_value2, loss_value3, loss_neuron1, loss_neuron2, loss_neuron3, grads_value = iterate(
                [gen_img])
            if args.transformation == 'light':
                grads_value = constraint_light(grads_value)  # constraint the gradients value
            elif args.transformation == 'occl':
                grads_value = constraint_occl(grads_value, args.start_point,
                                              args.occlusion_size)  # constraint the gradients value
            elif args.transformation == 'blackout':
                grads_value = constraint_black(grads_value)  # constraint the gradients value

            gen_img += grads_value * args.step
            predictions1 = np.argmax(model1.predict(gen_img)[0])
            predictions2 = np.argmax(model2.predict(gen_img)[0])
            predictions3 = np.argmax(model3.predict(gen_img)[0])

            if not predictions1 == predictions2 == predictions3:
                update_coverage(gen_img, model1, model_layer_dict1, args.threshold)
                update_coverage(gen_img, model2, model_layer_dict2, args.threshold)
                update_coverage(gen_img, model3, model_layer_dict3, args.threshold)

                print(bcolors.OKGREEN + 'covered neurons percentage %d neurons %.3f, %d neurons %.3f, %d neurons %.3f'
                      % (len(model_layer_dict1), neuron_covered(model_layer_dict1)[2], len(model_layer_dict2),
                        neuron_covered(model_layer_dict2)[2], len(model_layer_dict3),
                        neuron_covered(model_layer_dict3)[2]) + bcolors.ENDC)
                averaged_nc = (neuron_covered(model_layer_dict1)[0] + neuron_covered(model_layer_dict2)[0] +
                              neuron_covered(model_layer_dict3)[0]) / float(
                    neuron_covered(model_layer_dict1)[1] + neuron_covered(model_layer_dict2)[1] +
                    neuron_covered(model_layer_dict3)[
                        1])
                print(bcolors.OKGREEN + 'averaged covered neurons %.3f' % averaged_nc + bcolors.ENDC)

                gen_img_deprocessed = deprocess_image(gen_img)
                orig_img_deprocessed = deprocess_image(orig_img)

                # save the result to disk
                imwrite('./generated_inputs/' + args.transformation + '_' + str(predictions1) + '_' + str(
                    predictions2) + '_' + str(predictions3) + '.png',
                      gen_img_deprocessed)
                imwrite('./generated_inputs/' + args.transformation + '_' + str(predictions1) + '_' + str(
                    predictions2) + '_' + str(predictions3) + '_orig.png',
                      orig_img_deprocessed)
                break

In [37]:
Model1(train=True)
Model2(train=True)
Model3(train=True)

(60000, 28, 28)
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Overall Test score: 2.260607957839966
Overall Test accuracy: 0.18389999866485596
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Overall Test score: 2.1693663597106934
Overall Test accuracy: 0.3594000041484833
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


Overall Test score: 2.158369302749634
Overall Test accuracy: 0.38199999928474426


<keras.engine.functional.Functional at 0x7f0e64e0a490>

In [48]:
# transformation, weight_diff, weight_nc, step, seeds, grad_iterations, threshold
gen_diff(['light', '.3', '.3', '.3', '5', '3', '0.7'])

[94mModel1 loaded[0m
[94mModel2 loaded[0m
[94mModel3 loaded[0m
[92minput already causes different outputs: 3, 6, 6[0m
[92mcovered neurons percentage 52 neurons 0.058, 148 neurons 0.081, 268 neurons 0.049[0m
[92maveraged covered neurons 0.060[0m
[92minput already causes different outputs: 9, 6, 8[0m
[92mcovered neurons percentage 52 neurons 0.096, 148 neurons 0.095, 268 neurons 0.090[0m
[92maveraged covered neurons 0.092[0m
[92minput already causes different outputs: 8, 7, 2[0m
[92mcovered neurons percentage 52 neurons 0.135, 148 neurons 0.149, 268 neurons 0.108[0m
[92maveraged covered neurons 0.124[0m
[92minput already causes different outputs: 3, 7, 2[0m
[92mcovered neurons percentage 52 neurons 0.135, 148 neurons 0.196, 268 neurons 0.134[0m
[92maveraged covered neurons 0.154[0m
[92minput already causes different outputs: 3, 8, 2[0m
[92mcovered neurons percentage 52 neurons 0.135, 148 neurons 0.209, 268 neurons 0.153[0m
[92maveraged covered neurons 0